From b303e722eb99906c799e885871969e67798e6e88 Mon Sep 17 00:00:00 2001 From: Tamas Cseke Date: Fri, 28 Feb 2014 11:32:08 +0100 Subject: [PATCH] improve messaging performance of mod_erlang_event FS-3347 --resolve --- .../mod_erlang_event/mod_erlang_event.c | 59 +++++++++++++++---- .../mod_erlang_event/mod_erlang_event.h | 4 ++ 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 5d85254735..0113a0e593 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -27,6 +27,7 @@ * Andrew Thompson * Rob Charlton * Tamas Cseke + * Seven Du * * * mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket @@ -621,7 +622,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t * return SWITCH_STATUS_SUCCESS; } -static switch_status_t check_attached_sessions(listener_t *listener) +static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_sent) { session_elem_t *sp; switch_status_t status = SWITCH_STATUS_SUCCESS; @@ -682,6 +683,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) switch_mutex_lock(listener->sock_mutex); ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); switch_mutex_unlock(listener->sock_mutex); + (*msgs_sent)++; ei_x_free(&ebuf); switch_event_destroy(&pevent); } @@ -693,6 +695,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) ei_x_encode_atom(&ebuf, "call_hangup"); switch_mutex_lock(listener->sock_mutex); ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); + (*msgs_sent)++; switch_mutex_unlock(listener->sock_mutex); ei_x_free(&ebuf); @@ -718,6 +721,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) switch_mutex_lock(listener->sock_mutex); ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); switch_mutex_unlock(listener->sock_mutex); + (*msgs_sent)++; ei_x_free(&ebuf); switch_event_destroy(&pevent); @@ -744,13 +748,14 @@ static switch_status_t check_attached_sessions(listener_t *listener) } } -static void check_log_queue(listener_t *listener) +static int check_log_queue(listener_t *listener) { void *pop; + int msgs_sent = 0; /* send out any pending crap in the log queue */ if (switch_test_flag(listener, LFLAG_LOG)) { - if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { + while (msgs_sent < 100 && switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { switch_log_node_t *dnode = (switch_log_node_t *) pop; if (dnode->data) { @@ -793,21 +798,26 @@ static void check_log_queue(listener_t *listener) switch_mutex_lock(listener->sock_mutex); ei_sendto(listener->ec, listener->sockfd, &listener->log_process, &lbuf); switch_mutex_unlock(listener->sock_mutex); + msgs_sent ++; ei_x_free(&lbuf); switch_log_node_free(&dnode); } } } + + listener->total_logs += msgs_sent; + return msgs_sent; } -static void check_event_queue(listener_t *listener) +static int check_event_queue(listener_t *listener) { void *pop; + int msgs_sent = 0; /* send out any pending crap in the event queue */ if (switch_test_flag(listener, LFLAG_EVENTS)) { - if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + while (msgs_sent < 100 && switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { switch_event_t *pevent = (switch_event_t *) pop; @@ -820,10 +830,22 @@ static void check_event_queue(listener_t *listener) ei_sendto(listener->ec, listener->sockfd, &listener->event_process, &ebuf); switch_mutex_unlock(listener->sock_mutex); + msgs_sent++; + ei_x_free(&ebuf); + + if (pevent->event_id == SWITCH_EVENT_CHANNEL_CREATE) { + listener->create++; + } else if (pevent->event_id == SWITCH_EVENT_CHANNEL_HANGUP_COMPLETE) { + listener->hangup++; + } + switch_event_destroy(&pevent); } } + + listener->total_events += msgs_sent; + return msgs_sent; } static void handle_exit(listener_t *listener, erlang_pid * pid) @@ -881,6 +903,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) static void listener_main_loop(listener_t *listener) { int status = 1; + int msgs_sent = 0; /* how many messages we sent in a loop */ while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) { erlang_msg msg; @@ -890,9 +913,11 @@ static void listener_main_loop(listener_t *listener) ei_x_new(&buf); ei_x_new_with_version(&rbuf); + msgs_sent = 0; + /* do we need the mutex when reading? */ /*switch_mutex_lock(listener->sock_mutex); */ - status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 10); + status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 1); /*switch_mutex_unlock(listener->sock_mutex); */ switch (status) { @@ -959,12 +984,22 @@ static void listener_main_loop(listener_t *listener) ei_x_free(&buf); ei_x_free(&rbuf); - check_log_queue(listener); - check_event_queue(listener); - if (check_attached_sessions(listener) != SWITCH_STATUS_SUCCESS) { + msgs_sent += check_log_queue(listener); + msgs_sent += check_event_queue(listener); + if (check_attached_sessions(listener, &msgs_sent) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "check_attached_sessions requested exit\n"); return; } + + if (msgs_sent > SWITCH_CORE_QUEUE_LEN / 2) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%d messages sent in a loop\n", msgs_sent); + } else if (msgs_sent > 0) { +#ifdef EI_DEBUG + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%d messages sent in a loop\n", msgs_sent); +#endif + } else { /* no more messages right now, relax */ + switch_yield(100000); + } } if (prefs.done) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "shutting down listener\n"); @@ -1682,7 +1717,11 @@ SWITCH_STANDARD_API(erlang_cmd) if (listen_list.listeners) { for (l = listen_list.listeners; l; l = l->next) { - stream->write_function(stream, "Listener to %s with %d outbound sessions\n", l->peer_nodename, count_listener_sessions(l)); + stream->write_function(stream, "Listener to %s with outbound sessions: %d events: %" SWITCH_UINT64_T_FMT + " (lost:%d) logs: %" SWITCH_UINT64_T_FMT " (lost:%d) %d/%d\n", + l->peer_nodename, count_listener_sessions(l), + l->total_events, l->lost_events, + l->total_logs, l->lost_logs, l->create, l->hangup); } } else { stream->write_function(stream, "No active listeners\n"); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 70b85e716b..d84fd0f505 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -136,8 +136,12 @@ struct listener { switch_thread_rwlock_t *session_rwlock; //session_elem_t *session_list; switch_hash_t *sessions; + uint64_t total_events; + uint64_t total_logs; int lost_events; int lost_logs; + int create; + int hangup; uint32_t id; char remote_ip[50]; /*switch_port_t remote_port; */