diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c b/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c index da5354f3ec..2239855464 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c @@ -696,11 +696,19 @@ static switch_status_t kz_att_xfer_hanguphook(switch_core_session_t *session) switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_state_t state = switch_channel_get_state(channel); const char *id = NULL; + const char *peer_uuid = NULL; if (state == CS_HANGUP || state == CS_ROUTING) { if ((id = switch_channel_get_variable(channel, "xfer_uuids"))) { switch_stream_handle_t stream = { 0 }; SWITCH_STANDARD_STREAM(stream); + if ((peer_uuid = switch_channel_get_variable(channel, "xfer_peer_uuid"))) { + switch_core_session_t *peer_session = NULL; + if ((peer_session = switch_core_session_locate(peer_uuid)) != NULL ) { + switch_ivr_transfer_recordings(session, peer_session); + switch_core_session_rwunlock(peer_session); + } + } switch_api_execute("uuid_bridge", id, NULL, &stream); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "\nHangup Command uuid_bridge(%s):\n%s\n", id, switch_str_nil((char *) stream.data)); @@ -734,9 +742,12 @@ void *SWITCH_THREAD_FUNC kz_att_thread_run(switch_thread_t *thread, void *obj) switch_call_cause_t cause = SWITCH_CAUSE_NORMAL_CLEARING; switch_channel_t *channel = switch_core_session_get_channel(session), *peer_channel = NULL; const char *bond = NULL; - switch_core_session_t *b_session = NULL; switch_bool_t follow_recording = switch_true(switch_channel_get_variable(channel, "recording_follow_attxfer")); const char *attxfer_cancel_key = NULL, *attxfer_hangup_key = NULL, *attxfer_conf_key = NULL; + int br = 0; + switch_event_t *event = NULL; + switch_core_session_t *b_session = NULL; + switch_channel_t *b_channel = NULL; att->running = 1; @@ -746,18 +757,12 @@ void *SWITCH_THREAD_FUNC kz_att_thread_run(switch_thread_t *thread, void *obj) bond = switch_channel_get_partner_uuid(channel); if ((b_session = switch_core_session_locate(bond)) == NULL) { - switch_core_session_rwunlock(peer_session); + switch_core_session_rwunlock(session); return NULL; } - switch_channel_set_variable(channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, bond); switch_core_event_hook_add_state_change(session, kz_att_xfer_tmp_hanguphook); - if (follow_recording && (b_session = switch_core_session_locate(bond))) { - switch_ivr_transfer_recordings(b_session, session); - switch_core_session_rwunlock(b_session); - } - if (switch_ivr_originate(session, &peer_session, &cause, data, 0, NULL, NULL, NULL, NULL, NULL, SOF_NONE, NULL, NULL) != SWITCH_STATUS_SUCCESS || !peer_session) { switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, bond); @@ -799,45 +804,39 @@ void *SWITCH_THREAD_FUNC kz_att_thread_run(switch_thread_t *thread, void *obj) switch_channel_clear_flag(peer_channel, CF_INNER_BRIDGE); switch_channel_clear_flag(channel, CF_INNER_BRIDGE); - if (zstr(bond) && switch_channel_down(peer_channel)) { + if (switch_channel_down(peer_channel)) { switch_core_session_rwunlock(peer_session); switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, bond); goto end; } - if (bond) { - int br = 0; + /* + * we're emiting the transferee event so that callctl can update + */ + b_channel = switch_core_session_get_channel(b_session); + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, "sofia::transferee") == SWITCH_STATUS_SUCCESS) { + switch_channel_event_set_data(b_channel, event); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "att_xfer_replaced_call_id", switch_core_session_get_uuid(peer_session)); + switch_event_fire(&event); + } - switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, bond); + if (!switch_channel_ready(channel)) { + switch_status_t status; - if (!switch_channel_down(peer_channel)) { - /* - * we're emiting the transferee event so that callctl can update - */ - switch_event_t *event = NULL; - switch_channel_t *b_channel = switch_core_session_get_channel(b_session); - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, "sofia::transferee") == SWITCH_STATUS_SUCCESS) { - switch_channel_event_set_data(b_channel, event); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "att_xfer_replaced_call_id", switch_core_session_get_uuid(peer_session)); - switch_event_fire(&event); - } - if (!switch_channel_ready(channel)) { - switch_status_t status; - - if (follow_recording) { - switch_ivr_transfer_recordings(session, peer_session); - } - status = switch_ivr_uuid_bridge(switch_core_session_get_uuid(peer_session), bond); - kz_att_xfer_set_result(peer_channel, status); - br++; - } else { - switch_channel_set_variable_printf(b_channel, "xfer_uuids", "%s %s", switch_core_session_get_uuid(peer_session), switch_core_session_get_uuid(session)); - switch_channel_set_variable_printf(channel, "xfer_uuids", "%s %s", switch_core_session_get_uuid(peer_session), bond); - - switch_core_event_hook_add_state_change(session, kz_att_xfer_hanguphook); - switch_core_event_hook_add_state_change(b_session, kz_att_xfer_hanguphook); - } + if (follow_recording) { + switch_ivr_transfer_recordings(session, peer_session); } + status = switch_ivr_uuid_bridge(switch_core_session_get_uuid(peer_session), bond); + kz_att_xfer_set_result(peer_channel, status); + br++; + } else { + // switch_channel_set_variable_printf(b_channel, "xfer_uuids", "%s %s", switch_core_session_get_uuid(peer_session), switch_core_session_get_uuid(session)); + switch_channel_set_variable_printf(channel, "xfer_uuids", "%s %s", switch_core_session_get_uuid(peer_session), bond); + switch_channel_set_variable(channel, "xfer_peer_uuid", switch_core_session_get_uuid(peer_session)); + + switch_core_event_hook_add_state_change(session, kz_att_xfer_hanguphook); + // switch_core_event_hook_add_state_change(b_session, kz_att_xfer_hanguphook); + } /* * this was commented so that the existing bridge @@ -849,8 +848,6 @@ void *SWITCH_THREAD_FUNC kz_att_thread_run(switch_thread_t *thread, void *obj) } */ - } - switch_core_session_rwunlock(peer_session); end: diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h index 6add449b2b..a621115b06 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h @@ -81,6 +81,7 @@ struct ei_event_stream_s { uint32_t flags; ei_node_t *node; short event_stream_framing; + switch_interval_time_t queue_timeout; struct ei_event_stream_s *next; }; @@ -103,6 +104,9 @@ struct ei_node_s { uint32_t flags; int legacy; short event_stream_framing; + switch_interval_time_t event_stream_queue_timeout; + switch_interval_time_t receiver_queue_timeout; + switch_interval_time_t sender_queue_timeout; struct ei_node_s *next; }; @@ -171,11 +175,14 @@ struct kz_globals_s { int send_all_headers; int send_all_private_headers; int connection_timeout; - int receive_timeout; + int ei_receive_timeout; + switch_interval_time_t node_sender_queue_timeout; + switch_interval_time_t node_receiver_queue_timeout; int receive_msg_preallocate; int event_stream_preallocate; int send_msg_batch; short event_stream_framing; + switch_interval_time_t event_stream_queue_timeout; switch_port_t port; int config_fetched; int io_fault_tolerance; @@ -235,6 +242,7 @@ switch_status_t create_acceptor(); switch_hash_t *create_default_filter(); void kz_erl_init(); void kz_erl_shutdown(); +SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout); void fetch_config(); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c index e6bb4fa5ef..53ed984154 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c @@ -118,11 +118,14 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { kazoo_globals.send_all_headers = 0; kazoo_globals.send_all_private_headers = 1; kazoo_globals.connection_timeout = 500; - kazoo_globals.receive_timeout = 200; + kazoo_globals.ei_receive_timeout = 200; kazoo_globals.receive_msg_preallocate = 2000; kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE; kazoo_globals.send_msg_batch = 10; kazoo_globals.event_stream_framing = 2; + kazoo_globals.event_stream_queue_timeout = 200000; + kazoo_globals.node_receiver_queue_timeout = 100000; + kazoo_globals.node_sender_queue_timeout = 0; kazoo_globals.port = 0; kazoo_globals.io_fault_tolerance = 10; kazoo_globals.json_encoding = ERLANG_TUPLE; @@ -189,7 +192,7 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { kazoo_globals.connection_timeout = atoi(val); } else if (!strcmp(var, "receive-timeout")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-timeout: %s\n", val); - kazoo_globals.receive_timeout = atoi(val); + kazoo_globals.ei_receive_timeout = atoi(val); } else if (!strcmp(var, "receive-msg-preallocate")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-msg-preallocate: %s\n", val); kazoo_globals.receive_msg_preallocate = atoi(val); @@ -219,6 +222,15 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { } else if (!strcmp(var, "expand-headers-on-fetch")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set expand-headers-on-fetch: %s\n", val); kazoo_globals.expand_headers_on_fetch = switch_true(val); + } else if (!strcmp(var, "node-receiver-queue-timeout")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val); + kazoo_globals.node_receiver_queue_timeout = atoi(val); + } else if (!strcmp(var, "node-sender-queue-timeout")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val); + kazoo_globals.node_sender_queue_timeout = atoi(val); + } else if (!strcmp(var, "event-stream-queue-timeout")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val); + kazoo_globals.event_stream_queue_timeout = atoi(val); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val); } diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c b/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c index 96e2c7de66..fb655e8860 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c @@ -1038,6 +1038,14 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_kazoo_runtime) { return SWITCH_STATUS_TERM; } +SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout) +{ + if (timeout == 0) { + return switch_queue_trypop(queue, data); + } else { + return switch_queue_pop_timeout(queue, data, timeout); + } +} /* For Emacs: * Local Variables: * mode:c diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c b/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c index 5e5fb8dff4..c2a53933ea 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c @@ -302,7 +302,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void } /* if there was an event waiting in our queue send it to the client */ - if (switch_queue_pop_timeout(event_stream->queue, &pop, 200000) == SWITCH_STATUS_SUCCESS) { + if (ei_queue_pop(event_stream->queue, &pop, event_stream->queue_timeout) == SWITCH_STATUS_SUCCESS) { ei_x_buff *ebuf = (ei_x_buff *) pop; if (event_stream->socket) { @@ -401,6 +401,7 @@ ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from) event_stream->connected = SWITCH_FALSE; event_stream->node = ei_node; event_stream->event_stream_framing = ei_node->event_stream_framing; + event_stream->queue_timeout = ei_node->event_stream_queue_timeout; memcpy(&event_stream->pid, from, sizeof(erlang_pid)); switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_node.c b/src/mod/event_handlers/mod_kazoo/kazoo_node.c index 273fcbaa7f..e11a48d6ac 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_node.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_node.c @@ -1452,7 +1452,7 @@ static void *SWITCH_THREAD_FUNC receive_handler(switch_thread_t *thread, void *o while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { void *pop = NULL; - if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) { + if (ei_queue_pop(ei_node->received_msgs, &pop, ei_node->receiver_queue_timeout) == SWITCH_STATUS_SUCCESS) { ei_received_msg_t *received_msg = (ei_received_msg_t *) pop; handle_erl_msg(ei_node, &received_msg->msg, &received_msg->buf); ei_x_free(&received_msg->buf); @@ -1505,7 +1505,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) } while (++send_msg_count <= kazoo_globals.send_msg_batch - && switch_queue_pop_timeout(ei_node->send_msgs, &pop, 20000) == SWITCH_STATUS_SUCCESS) { + && ei_queue_pop(ei_node->send_msgs, &pop, ei_node->sender_queue_timeout) == SWITCH_STATUS_SUCCESS) { ei_send_msg_t *send_msg = (ei_send_msg_t *) pop; ei_helper_send(ei_node, &send_msg->pid, &send_msg->buf); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sent erlang message to %s <%d.%d.%d>\n" @@ -1518,7 +1518,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) } /* wait for a erlang message, or timeout to check if the module is still running */ - status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.receive_timeout); + status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.ei_receive_timeout); switch (status) { case ERL_TICK: @@ -1629,6 +1629,9 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) { ei_node->created_time = switch_micro_time_now(); ei_node->legacy = kazoo_globals.legacy_events; ei_node->event_stream_framing = kazoo_globals.event_stream_framing; + ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout; + ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout; + ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout; /* store the IP and node name we are talking with */ switch_os_sock_put(&ei_node->socket, (switch_os_socket_t *)&nodefd, pool);