improve messaging performance of mod_erlang_event FS-3347 --resolve

This commit is contained in:
Tamas Cseke 2014-02-28 11:32:08 +01:00
parent 4b6c08b34e
commit b303e722eb
2 changed files with 53 additions and 10 deletions

View File

@ -27,6 +27,7 @@
* Andrew Thompson <andrew@hijacked.us>
* Rob Charlton <rob.charlton@savageminds.com>
* Tamas Cseke <tamas.cseke@virtual-call-center.eu>
* Seven Du <dujinfang@gmail.com>
*
*
* mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket
@ -621,7 +622,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t *
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t check_attached_sessions(listener_t *listener)
static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_sent)
{
session_elem_t *sp;
switch_status_t status = SWITCH_STATUS_SUCCESS;
@ -682,6 +683,7 @@ static switch_status_t check_attached_sessions(listener_t *listener)
switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf);
switch_mutex_unlock(listener->sock_mutex);
(*msgs_sent)++;
ei_x_free(&ebuf);
switch_event_destroy(&pevent);
}
@ -693,6 +695,7 @@ static switch_status_t check_attached_sessions(listener_t *listener)
ei_x_encode_atom(&ebuf, "call_hangup");
switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf);
(*msgs_sent)++;
switch_mutex_unlock(listener->sock_mutex);
ei_x_free(&ebuf);
@ -718,6 +721,7 @@ static switch_status_t check_attached_sessions(listener_t *listener)
switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf);
switch_mutex_unlock(listener->sock_mutex);
(*msgs_sent)++;
ei_x_free(&ebuf);
switch_event_destroy(&pevent);
@ -744,13 +748,14 @@ static switch_status_t check_attached_sessions(listener_t *listener)
}
}
static void check_log_queue(listener_t *listener)
static int check_log_queue(listener_t *listener)
{
void *pop;
int msgs_sent = 0;
/* send out any pending crap in the log queue */
if (switch_test_flag(listener, LFLAG_LOG)) {
if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
while (msgs_sent < 100 && switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_log_node_t *dnode = (switch_log_node_t *) pop;
if (dnode->data) {
@ -793,21 +798,26 @@ static void check_log_queue(listener_t *listener)
switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &listener->log_process, &lbuf);
switch_mutex_unlock(listener->sock_mutex);
msgs_sent ++;
ei_x_free(&lbuf);
switch_log_node_free(&dnode);
}
}
}
listener->total_logs += msgs_sent;
return msgs_sent;
}
static void check_event_queue(listener_t *listener)
static int check_event_queue(listener_t *listener)
{
void *pop;
int msgs_sent = 0;
/* send out any pending crap in the event queue */
if (switch_test_flag(listener, LFLAG_EVENTS)) {
if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
while (msgs_sent < 100 && switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_event_t *pevent = (switch_event_t *) pop;
@ -820,10 +830,22 @@ static void check_event_queue(listener_t *listener)
ei_sendto(listener->ec, listener->sockfd, &listener->event_process, &ebuf);
switch_mutex_unlock(listener->sock_mutex);
msgs_sent++;
ei_x_free(&ebuf);
if (pevent->event_id == SWITCH_EVENT_CHANNEL_CREATE) {
listener->create++;
} else if (pevent->event_id == SWITCH_EVENT_CHANNEL_HANGUP_COMPLETE) {
listener->hangup++;
}
switch_event_destroy(&pevent);
}
}
listener->total_events += msgs_sent;
return msgs_sent;
}
static void handle_exit(listener_t *listener, erlang_pid * pid)
@ -881,6 +903,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
static void listener_main_loop(listener_t *listener)
{
int status = 1;
int msgs_sent = 0; /* how many messages we sent in a loop */
while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) {
erlang_msg msg;
@ -890,9 +913,11 @@ static void listener_main_loop(listener_t *listener)
ei_x_new(&buf);
ei_x_new_with_version(&rbuf);
msgs_sent = 0;
/* do we need the mutex when reading? */
/*switch_mutex_lock(listener->sock_mutex); */
status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 10);
status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 1);
/*switch_mutex_unlock(listener->sock_mutex); */
switch (status) {
@ -959,12 +984,22 @@ static void listener_main_loop(listener_t *listener)
ei_x_free(&buf);
ei_x_free(&rbuf);
check_log_queue(listener);
check_event_queue(listener);
if (check_attached_sessions(listener) != SWITCH_STATUS_SUCCESS) {
msgs_sent += check_log_queue(listener);
msgs_sent += check_event_queue(listener);
if (check_attached_sessions(listener, &msgs_sent) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "check_attached_sessions requested exit\n");
return;
}
if (msgs_sent > SWITCH_CORE_QUEUE_LEN / 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%d messages sent in a loop\n", msgs_sent);
} else if (msgs_sent > 0) {
#ifdef EI_DEBUG
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%d messages sent in a loop\n", msgs_sent);
#endif
} else { /* no more messages right now, relax */
switch_yield(100000);
}
}
if (prefs.done) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "shutting down listener\n");
@ -1682,7 +1717,11 @@ SWITCH_STANDARD_API(erlang_cmd)
if (listen_list.listeners) {
for (l = listen_list.listeners; l; l = l->next) {
stream->write_function(stream, "Listener to %s with %d outbound sessions\n", l->peer_nodename, count_listener_sessions(l));
stream->write_function(stream, "Listener to %s with outbound sessions: %d events: %" SWITCH_UINT64_T_FMT
" (lost:%d) logs: %" SWITCH_UINT64_T_FMT " (lost:%d) %d/%d\n",
l->peer_nodename, count_listener_sessions(l),
l->total_events, l->lost_events,
l->total_logs, l->lost_logs, l->create, l->hangup);
}
} else {
stream->write_function(stream, "No active listeners\n");

View File

@ -136,8 +136,12 @@ struct listener {
switch_thread_rwlock_t *session_rwlock;
//session_elem_t *session_list;
switch_hash_t *sessions;
uint64_t total_events;
uint64_t total_logs;
int lost_events;
int lost_logs;
int create;
int hangup;
uint32_t id;
char remote_ip[50];
/*switch_port_t remote_port; */