fix some contention issues under really high load...That doesn't mean you need to push it this hard and bug me about it =p

This commit is contained in:
Anthony Minessale 2012-11-07 12:10:50 -06:00
parent feb38cece2
commit f60962ae87
7 changed files with 89 additions and 78 deletions

View File

@ -389,6 +389,7 @@ struct mod_sofia_globals {
int tracelevel; int tracelevel;
char *capture_server; char *capture_server;
int rewrite_multicasted_fs_path; int rewrite_multicasted_fs_path;
int presence_flush;
}; };
extern struct mod_sofia_globals mod_sofia_globals; extern struct mod_sofia_globals mod_sofia_globals;
@ -694,6 +695,7 @@ struct sofia_profile {
int ireg_seconds; int ireg_seconds;
sofia_paid_type_t paid_type; sofia_paid_type_t paid_type;
uint32_t rtp_digit_delay; uint32_t rtp_digit_delay;
switch_queue_t *event_queue;
}; };
struct private_object { struct private_object {
@ -1206,6 +1208,8 @@ int sofia_recover_callback(switch_core_session_t *session);
void sofia_glue_set_name(private_object_t *tech_pvt, const char *channame); void sofia_glue_set_name(private_object_t *tech_pvt, const char *channame);
private_object_t *sofia_glue_new_pvt(switch_core_session_t *session); private_object_t *sofia_glue_new_pvt(switch_core_session_t *session);
switch_status_t sofia_init(void); switch_status_t sofia_init(void);
void sofia_glue_fire_events(sofia_profile_t *profile);
void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event);
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:

View File

@ -1559,7 +1559,7 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
{ {
sofia_dispatch_event_t *de = *dep; sofia_dispatch_event_t *de = *dep;
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
sofia_profile_t *profile = (*dep)->profile; //sofia_profile_t *profile = (*dep)->profile;
switch_thread_data_t *td; switch_thread_data_t *td;
switch_core_new_memory_pool(&pool); switch_core_new_memory_pool(&pool);
@ -1571,45 +1571,10 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
td->func = sofia_msg_thread_run_once; td->func = sofia_msg_thread_run_once;
td->obj = de; td->obj = de;
switch_mutex_lock(profile->ireg_mutex);
switch_thread_pool_launch_thread(&td); switch_thread_pool_launch_thread(&td);
switch_mutex_unlock(profile->ireg_mutex);
} }
#if 0
void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
{
sofia_dispatch_event_t *de = *dep;
switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool;
switch_thread_t *thread;
sofia_profile_t *profile = (*dep)->profile;
switch_status_t status;
switch_core_new_memory_pool(&pool);
*dep = NULL;
de->pool = pool;
switch_mutex_lock(profile->ireg_mutex);
switch_threadattr_create(&thd_attr, de->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
status = switch_thread_create(&thread,
thd_attr,
sofia_msg_thread_run_once,
de,
de->pool);
switch_mutex_unlock(profile->ireg_mutex);
if (status != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot create threads!\n");
sofia_process_dispatch_event(&de);
}
}
#endif
void sofia_process_dispatch_event(sofia_dispatch_event_t **dep) void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
{ {
sofia_dispatch_event_t *de = *dep; sofia_dispatch_event_t *de = *dep;
@ -1992,6 +1957,7 @@ void sofia_event_callback(nua_event_t event,
sofia_queue_message(de); sofia_queue_message(de);
end: end:
//switch_cond_next();
return; return;
} }
@ -2133,7 +2099,7 @@ void event_handler(switch_event_t *event)
contact_str = fixed_contact_str; contact_str = fixed_contact_str;
} }
switch_mutex_lock(profile->ireg_mutex);
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
switch_find_local_ip(guess_ip4, sizeof(guess_ip4), NULL, AF_INET); switch_find_local_ip(guess_ip4, sizeof(guess_ip4), NULL, AF_INET);
@ -2150,7 +2116,7 @@ void event_handler(switch_event_t *event)
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Propagating registration for %s@%s->%s\n", from_user, from_host, contact_str); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Propagating registration for %s@%s->%s\n", from_user, from_host, contact_str);
} }
switch_mutex_unlock(profile->ireg_mutex);
if (profile) { if (profile) {
sofia_glue_release_profile(profile); sofia_glue_release_profile(profile);
@ -2557,6 +2523,8 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool); switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool);
switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool); switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool);
switch_queue_create(&profile->event_queue, SOFIA_QUEUE_SIZE, profile->pool);
switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name); switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
switch_sql_queue_manager_init_name(qname, switch_sql_queue_manager_init_name(qname,

View File

@ -6498,6 +6498,9 @@ switch_bool_t sofia_glue_execute_sql_callback(sofia_profile_t *profile,
switch_cache_db_release_db_handle(&dbh); switch_cache_db_release_db_handle(&dbh);
sofia_glue_fire_events(profile);
return ret; return ret;
} }
@ -7139,6 +7142,23 @@ char *sofia_glue_get_host(const char *str, switch_memory_pool_t *pool)
return s; return s;
} }
void sofia_glue_fire_events(sofia_profile_t *profile)
{
void *pop = NULL;
while (profile->event_queue && switch_queue_trypop(profile->event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
switch_event_t *event = (switch_event_t *) pop;
switch_event_fire(&event);
}
}
void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event)
{
switch_queue_push(profile->event_queue, *event);
*event = NULL;
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:

View File

@ -1027,7 +1027,7 @@ static void conference_data_event_handler(switch_event_t *event)
switch_safe_free(dup_domain); switch_safe_free(dup_domain);
} }
static void actual_sofia_presence_event_handler(switch_event_t *event) static switch_event_t *actual_sofia_presence_event_handler(switch_event_t *event)
{ {
sofia_profile_t *profile = NULL; sofia_profile_t *profile = NULL;
char *from = switch_event_get_header(event, "from"); char *from = switch_event_get_header(event, "from");
@ -1047,10 +1047,10 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
switch_console_callback_match_t *matches; switch_console_callback_match_t *matches;
struct presence_helper helper = { 0 }; struct presence_helper helper = { 0 };
int hup = 0; int hup = 0;
switch_event_t *s_event = NULL;
if (!mod_sofia_globals.running) { if (!mod_sofia_globals.running) {
return; goto done;
} }
if (zstr(proto) || !strcasecmp(proto, "any")) { if (zstr(proto) || !strcasecmp(proto, "any")) {
@ -1091,7 +1091,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
if (!mod_sofia_globals.profile_hash) { if (!mod_sofia_globals.profile_hash) {
return; goto done;
} }
if (from) { if (from) {
@ -1171,7 +1171,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
} }
switch_safe_free(sql); switch_safe_free(sql);
return; goto done;
} }
if (zstr(event_type)) { if (zstr(event_type)) {
@ -1195,7 +1195,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
} }
} else { } else {
switch_safe_free(user); switch_safe_free(user);
return; goto done;
} }
if ((euser = strchr(user, '+'))) { if ((euser = strchr(user, '+'))) {
euser++; euser++;
@ -1203,7 +1203,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
euser = user; euser = user;
} }
} else { } else {
return; goto done;
} }
switch (event->event_id) { switch (event->event_id) {
@ -1462,8 +1462,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
if (hup && dh.hits < 1) { if (hup && dh.hits < 1) {
/* so many phones get confused when whe hangup we have to reprobe to get them all to reset to absolute states so the lights stay correct */ /* so many phones get confused when whe hangup we have to reprobe to get them all to reset to absolute states so the lights stay correct */
switch_event_t *s_event;
if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_PROBE) == SWITCH_STATUS_SUCCESS) { if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_PROBE) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "proto", SOFIA_CHAT_PROTO); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "proto", SOFIA_CHAT_PROTO);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "login", profile->name); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "login", profile->name);
@ -1471,10 +1470,9 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "to", "%s@%s", euser, host); switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "to", "%s@%s", euser, host);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence"); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence");
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog"); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog");
switch_event_fire(&s_event);
} }
} }
if (!zstr((char *) helper.stream.data)) { if (!zstr((char *) helper.stream.data)) {
char *this_sql = (char *) helper.stream.data; char *this_sql = (char *) helper.stream.data;
@ -1509,11 +1507,24 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
switch_safe_free(sql); switch_safe_free(sql);
switch_safe_free(user); switch_safe_free(user);
return s_event;
} }
static int EVENT_THREAD_RUNNING = 0; static int EVENT_THREAD_RUNNING = 0;
static int EVENT_THREAD_STARTED = 0; static int EVENT_THREAD_STARTED = 0;
static void do_flush(void)
{
void *pop = NULL;
while (mod_sofia_globals.presence_queue && switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
switch_event_t *event = (switch_event_t *) pop;
switch_event_destroy(&event);
}
}
void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread, void *obj) void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread, void *obj)
{ {
void *pop; void *pop;
@ -1544,6 +1555,15 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread
break; break;
} }
if (mod_sofia_globals.presence_flush) {
switch_mutex_lock(mod_sofia_globals.mutex);
if (mod_sofia_globals.presence_flush) {
do_flush();
mod_sofia_globals.presence_flush = 0;
}
switch_mutex_unlock(mod_sofia_globals.mutex);
}
switch(event->event_id) { switch(event->event_id) {
case SWITCH_EVENT_MESSAGE_WAITING: case SWITCH_EVENT_MESSAGE_WAITING:
actual_sofia_presence_mwi_event_handler(event); actual_sofia_presence_mwi_event_handler(event);
@ -1552,7 +1572,11 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread
conference_data_event_handler(event); conference_data_event_handler(event);
break; break;
default: default:
actual_sofia_presence_event_handler(event); do {
switch_event_t *ievent = event;
event = actual_sofia_presence_event_handler(ievent);
switch_event_destroy(&ievent);
} while (event);
break; break;
} }
@ -1561,10 +1585,7 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread
} }
} }
while (switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { do_flush();
switch_event_t *event = (switch_event_t *) pop;
switch_event_destroy(&event);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n");
@ -1606,13 +1627,23 @@ void sofia_presence_event_handler(switch_event_t *event)
{ {
switch_event_t *cloned_event; switch_event_t *cloned_event;
switch_event_dup(&cloned_event, event);
switch_assert(cloned_event);
switch_queue_push(mod_sofia_globals.presence_queue, cloned_event);
if (!EVENT_THREAD_STARTED) { if (!EVENT_THREAD_STARTED) {
sofia_presence_event_thread_start(); sofia_presence_event_thread_start();
switch_yield(500000);
} }
switch_event_dup(&cloned_event, event);
switch_assert(cloned_event);
if (switch_queue_trypush(mod_sofia_globals.presence_queue, cloned_event) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Presence queue overloaded.... Flushing queue\n");
switch_mutex_lock(mod_sofia_globals.mutex);
mod_sofia_globals.presence_flush = 1;
switch_mutex_unlock(mod_sofia_globals.mutex);
switch_event_destroy(&cloned_event);
}
} }
@ -1640,7 +1671,7 @@ static int sofia_presence_sub_reg_callback(void *pArg, int argc, char **argv, ch
} }
switch_event_fire(&event); sofia_event_fire(profile, &event);
} }
return 0; return 0;
} }
@ -1653,7 +1684,7 @@ static int sofia_presence_sub_reg_callback(void *pArg, int argc, char **argv, ch
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event_subtype", "probe"); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event_subtype", "probe");
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto-specific-event-name", event_name); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto-specific-event-name", event_name);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "expires", expires); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "expires", expires);
switch_event_fire(&event); sofia_event_fire(profile, &event);
} }
return 0; return 0;
@ -1777,7 +1808,7 @@ static int sofia_presence_resub_callback(void *pArg, int argc, char **argv, char
} }
} }
switch_event_fire(&event); sofia_event_fire(profile, &event);
} }
switch_safe_free(free_me); switch_safe_free(free_me);

View File

@ -635,7 +635,7 @@ int sofia_reg_del_callback(void *pArg, int argc, char **argv, char **columnNames
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "contact", argv[3]); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "contact", argv[3]);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "expires", argv[6]); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "expires", argv[6]);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "user-agent", argv[7]); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "user-agent", argv[7]);
switch_event_fire(&s_event); sofia_event_fire(profile, &s_event);
} }
if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) { if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
@ -653,7 +653,7 @@ int sofia_reg_del_callback(void *pArg, int argc, char **argv, char **columnNames
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "status", "Unregistered"); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "status", "Unregistered");
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence"); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence");
switch_event_fire(&s_event); sofia_event_fire(profile, &s_event);
} }
} }
@ -859,7 +859,6 @@ void sofia_reg_check_sync(sofia_profile_t *profile)
sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);

View File

@ -175,13 +175,8 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
if (switch_queue_trypush(l->log_queue, dnode) == SWITCH_STATUS_SUCCESS) { if (switch_queue_trypush(l->log_queue, dnode) == SWITCH_STATUS_SUCCESS) {
if (l->lost_logs) { if (l->lost_logs) {
int ll = l->lost_logs; int ll = l->lost_logs;
switch_event_t *event;
l->lost_logs = 0; l->lost_logs = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Lost %d log lines!\n", ll); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Lost %d log lines!\n", ll);
if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "info", "lost %d log lines", ll);
switch_event_fire(&event);
}
} }
} else { } else {
switch_log_node_free(&dnode); switch_log_node_free(&dnode);
@ -378,11 +373,6 @@ static void event_handler(switch_event_t *event)
int le = l->lost_events; int le = l->lost_events;
l->lost_events = 0; l->lost_events = 0;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost %d events!\n", le); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost %d events!\n", le);
clone = NULL;
if (switch_event_create(&clone, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(clone, SWITCH_STACK_BOTTOM, "info", "lost %d events", le);
switch_event_fire(&clone);
}
} }
} else { } else {
if (++l->lost_events > MAX_MISSED) { if (++l->lost_events > MAX_MISSED) {

View File

@ -472,7 +472,6 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL); switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
} }
@ -487,8 +486,8 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
} }
x = 0; x = 0;
while (x < 10000 && THREAD_COUNT) { while (x < 100 && THREAD_COUNT) {
switch_cond_next(); switch_yield(100000);
if (THREAD_COUNT == last) { if (THREAD_COUNT == last) {
x++; x++;
} }