From be9888d74cc5f75ea8edddd0fe4735ee851d7ad3 Mon Sep 17 00:00:00 2001 From: lazedo Date: Fri, 24 Jan 2020 16:39:51 +0000 Subject: [PATCH] [mod_kazoo] add configuration for queue timeouts * implements ei_queue_pop that calls switch_queue_trypop or switch_queue_pop_timeout depending on timeout value * default for sender_timeout is set to 0 to bring back previous behaviour of using switch_queue_trypop when fetching messages to be sent to erlang node --- src/mod/event_handlers/mod_kazoo/kazoo_ei.h | 10 +++++++++- .../event_handlers/mod_kazoo/kazoo_ei_config.c | 16 ++++++++++++++-- .../event_handlers/mod_kazoo/kazoo_ei_utils.c | 8 ++++++++ .../mod_kazoo/kazoo_event_stream.c | 3 ++- src/mod/event_handlers/mod_kazoo/kazoo_node.c | 9 ++++++--- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h index 6add449b2b..a621115b06 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h @@ -81,6 +81,7 @@ struct ei_event_stream_s { uint32_t flags; ei_node_t *node; short event_stream_framing; + switch_interval_time_t queue_timeout; struct ei_event_stream_s *next; }; @@ -103,6 +104,9 @@ struct ei_node_s { uint32_t flags; int legacy; short event_stream_framing; + switch_interval_time_t event_stream_queue_timeout; + switch_interval_time_t receiver_queue_timeout; + switch_interval_time_t sender_queue_timeout; struct ei_node_s *next; }; @@ -171,11 +175,14 @@ struct kz_globals_s { int send_all_headers; int send_all_private_headers; int connection_timeout; - int receive_timeout; + int ei_receive_timeout; + switch_interval_time_t node_sender_queue_timeout; + switch_interval_time_t node_receiver_queue_timeout; int receive_msg_preallocate; int event_stream_preallocate; int send_msg_batch; short event_stream_framing; + switch_interval_time_t event_stream_queue_timeout; switch_port_t port; int config_fetched; int io_fault_tolerance; @@ -235,6 +242,7 @@ switch_status_t create_acceptor(); switch_hash_t *create_default_filter(); void kz_erl_init(); void kz_erl_shutdown(); +SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout); void fetch_config(); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c index e6bb4fa5ef..53ed984154 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c @@ -118,11 +118,14 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { kazoo_globals.send_all_headers = 0; kazoo_globals.send_all_private_headers = 1; kazoo_globals.connection_timeout = 500; - kazoo_globals.receive_timeout = 200; + kazoo_globals.ei_receive_timeout = 200; kazoo_globals.receive_msg_preallocate = 2000; kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE; kazoo_globals.send_msg_batch = 10; kazoo_globals.event_stream_framing = 2; + kazoo_globals.event_stream_queue_timeout = 200000; + kazoo_globals.node_receiver_queue_timeout = 100000; + kazoo_globals.node_sender_queue_timeout = 0; kazoo_globals.port = 0; kazoo_globals.io_fault_tolerance = 10; kazoo_globals.json_encoding = ERLANG_TUPLE; @@ -189,7 +192,7 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { kazoo_globals.connection_timeout = atoi(val); } else if (!strcmp(var, "receive-timeout")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-timeout: %s\n", val); - kazoo_globals.receive_timeout = atoi(val); + kazoo_globals.ei_receive_timeout = atoi(val); } else if (!strcmp(var, "receive-msg-preallocate")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-msg-preallocate: %s\n", val); kazoo_globals.receive_msg_preallocate = atoi(val); @@ -219,6 +222,15 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { } else if (!strcmp(var, "expand-headers-on-fetch")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set expand-headers-on-fetch: %s\n", val); kazoo_globals.expand_headers_on_fetch = switch_true(val); + } else if (!strcmp(var, "node-receiver-queue-timeout")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val); + kazoo_globals.node_receiver_queue_timeout = atoi(val); + } else if (!strcmp(var, "node-sender-queue-timeout")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val); + kazoo_globals.node_sender_queue_timeout = atoi(val); + } else if (!strcmp(var, "event-stream-queue-timeout")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val); + kazoo_globals.event_stream_queue_timeout = atoi(val); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val); } diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c b/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c index 96e2c7de66..fb655e8860 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c @@ -1038,6 +1038,14 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_kazoo_runtime) { return SWITCH_STATUS_TERM; } +SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout) +{ + if (timeout == 0) { + return switch_queue_trypop(queue, data); + } else { + return switch_queue_pop_timeout(queue, data, timeout); + } +} /* For Emacs: * Local Variables: * mode:c diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c b/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c index 5e5fb8dff4..c2a53933ea 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c @@ -302,7 +302,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void } /* if there was an event waiting in our queue send it to the client */ - if (switch_queue_pop_timeout(event_stream->queue, &pop, 200000) == SWITCH_STATUS_SUCCESS) { + if (ei_queue_pop(event_stream->queue, &pop, event_stream->queue_timeout) == SWITCH_STATUS_SUCCESS) { ei_x_buff *ebuf = (ei_x_buff *) pop; if (event_stream->socket) { @@ -401,6 +401,7 @@ ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from) event_stream->connected = SWITCH_FALSE; event_stream->node = ei_node; event_stream->event_stream_framing = ei_node->event_stream_framing; + event_stream->queue_timeout = ei_node->event_stream_queue_timeout; memcpy(&event_stream->pid, from, sizeof(erlang_pid)); switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_node.c b/src/mod/event_handlers/mod_kazoo/kazoo_node.c index 273fcbaa7f..e11a48d6ac 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_node.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_node.c @@ -1452,7 +1452,7 @@ static void *SWITCH_THREAD_FUNC receive_handler(switch_thread_t *thread, void *o while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { void *pop = NULL; - if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) { + if (ei_queue_pop(ei_node->received_msgs, &pop, ei_node->receiver_queue_timeout) == SWITCH_STATUS_SUCCESS) { ei_received_msg_t *received_msg = (ei_received_msg_t *) pop; handle_erl_msg(ei_node, &received_msg->msg, &received_msg->buf); ei_x_free(&received_msg->buf); @@ -1505,7 +1505,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) } while (++send_msg_count <= kazoo_globals.send_msg_batch - && switch_queue_pop_timeout(ei_node->send_msgs, &pop, 20000) == SWITCH_STATUS_SUCCESS) { + && ei_queue_pop(ei_node->send_msgs, &pop, ei_node->sender_queue_timeout) == SWITCH_STATUS_SUCCESS) { ei_send_msg_t *send_msg = (ei_send_msg_t *) pop; ei_helper_send(ei_node, &send_msg->pid, &send_msg->buf); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sent erlang message to %s <%d.%d.%d>\n" @@ -1518,7 +1518,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) } /* wait for a erlang message, or timeout to check if the module is still running */ - status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.receive_timeout); + status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.ei_receive_timeout); switch (status) { case ERL_TICK: @@ -1629,6 +1629,9 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) { ei_node->created_time = switch_micro_time_now(); ei_node->legacy = kazoo_globals.legacy_events; ei_node->event_stream_framing = kazoo_globals.event_stream_framing; + ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout; + ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout; + ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout; /* store the IP and node name we are talking with */ switch_os_sock_put(&ei_node->socket, (switch_os_socket_t *)&nodefd, pool);