diff --git a/libs/sofia-sip/.update b/libs/sofia-sip/.update index a02df03209..ade22984f2 100644 --- a/libs/sofia-sip/.update +++ b/libs/sofia-sip/.update @@ -1 +1 @@ -Thu Apr 26 10:23:33 CDT 2012 +Thu May 3 16:30:20 CDT 2012 diff --git a/libs/sofia-sip/libsofia-sip-ua/nta/nta.c b/libs/sofia-sip/libsofia-sip-ua/nta/nta.c index 5dd574b88f..bebe74262f 100644 --- a/libs/sofia-sip/libsofia-sip-ua/nta/nta.c +++ b/libs/sofia-sip/libsofia-sip-ua/nta/nta.c @@ -4165,7 +4165,7 @@ nta_leg_t *nta_leg_tcreate(nta_agent_t *agent, if (i == NONE) /* Magic value, used for compatibility */ no_dialog = 1; - if (!(leg = su_home_clone(agent->sa_home, sizeof(*leg)))) + if (!(leg = su_home_clone(NULL, sizeof(*leg)))) return NULL; home = leg->leg_home; @@ -4394,7 +4394,8 @@ void nta_leg_destroy(nta_leg_t *leg) static void leg_free(nta_agent_t *sa, nta_leg_t *leg) { - su_free(sa->sa_home, leg); + //su_free(sa->sa_home, leg); + su_home_unref((su_home_t *)leg); } /** Return application context for the leg */ @@ -5327,7 +5328,7 @@ nta_incoming_t *incoming_create(nta_agent_t *agent, } irq->irq_branch = sip->sip_via->v_branch; irq->irq_reliable_tp = tport_is_reliable(tport); - irq->irq_extra_100 = 1; /* Sending extra 100 trying true by default */ + irq->irq_extra_100 = 0; /* Sending extra 100 trying false by default */ if (sip->sip_timestamp) irq->irq_timestamp = sip_timestamp_copy(home, sip->sip_timestamp); @@ -6068,7 +6069,7 @@ incoming_recv(nta_incoming_t *irq, msg_t *msg, sip_t *sip, tport_t *tport) if (irq->irq_status >= 100) { SU_DEBUG_5(("nta: re-received %s request, retransmitting %u reply\n", sip->sip_request->rq_method_name, irq->irq_status)); - incoming_retransmit_reply(irq, tport); + incoming_retransmit_reply(irq, tport); } else if (irq->irq_agent->sa_extra_100 && irq->irq_extra_100) { @@ -6910,6 +6911,7 @@ _nta_incoming_timer(nta_agent_t *sa) incoming_reset_timer(irq); if(irq->irq_extra_100) { + printf("COCK FACE\n"); SU_DEBUG_5(("nta: timer N1 fired, sending %u %s\n", SIP_100_TRYING)); nta_incoming_treply(irq, SIP_100_TRYING, TAG_END()); } diff --git a/libs/sofia-sip/libsofia-sip-ua/nua/nua_common.c b/libs/sofia-sip/libsofia-sip-ua/nua/nua_common.c index bc842c242d..661a18b1d2 100644 --- a/libs/sofia-sip/libsofia-sip-ua/nua/nua_common.c +++ b/libs/sofia-sip/libsofia-sip-ua/nua/nua_common.c @@ -109,8 +109,8 @@ nua_handle_t *nh_create_handle(nua_t *nua, assert(nua->nua_home); - if ((nh = su_home_clone(nua->nua_home, sizeof(*nh)))) { - //if ((nh = su_home_new(sizeof(*nh)))) { + //if ((nh = su_home_clone(nua->nua_home, sizeof(*nh)))) { + if ((nh = su_home_new(sizeof(*nh)))) { nh->nh_valid = nua_valid_handle_cookie; nh->nh_nua = nua; nh->nh_magic = hmagic; diff --git a/libs/sofia-sip/libsofia-sip-ua/nua/nua_server.c b/libs/sofia-sip/libsofia-sip-ua/nua/nua_server.c index 50150a3a42..f27c83fb92 100644 --- a/libs/sofia-sip/libsofia-sip-ua/nua/nua_server.c +++ b/libs/sofia-sip/libsofia-sip-ua/nua/nua_server.c @@ -261,11 +261,12 @@ int nua_stack_process_request(nua_handle_t *nh, } if (sr->sr_status <= 100) { - SR_STATUS1(sr, SIP_100_TRYING); + SR_STATUS1(sr, SIP_100_TRYING); if (method == sip_method_invite || sip->sip_timestamp) { - nta_incoming_treply(irq, SIP_100_TRYING, - SIPTAG_USER_AGENT_STR(user_agent), - TAG_END()); + nta_incoming_treply(irq, SIP_100_TRYING, + SIPTAG_USER_AGENT_STR(user_agent), + TAG_END()); + } } else { diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.c b/src/mod/endpoints/mod_sofia/mod_sofia.c index 3780886287..ea2ba7ce4d 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.c +++ b/src/mod/endpoints/mod_sofia/mod_sofia.c @@ -91,6 +91,10 @@ static switch_status_t sofia_on_init(switch_core_session_t *session) sofia_set_flag(tech_pvt, TFLAG_RECOVERED); } + if (switch_channel_direction(tech_pvt->channel) == SWITCH_CALL_DIRECTION_INBOUND) { + nua_respond(tech_pvt->nh, 101, "Dialing", TAG_END()); + } + if (sofia_test_flag(tech_pvt, TFLAG_OUTBOUND) || sofia_test_flag(tech_pvt, TFLAG_RECOVERING)) { const char *var; @@ -5343,7 +5347,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load) switch_chat_interface_t *chat_interface; switch_api_interface_t *api_interface; switch_management_interface_t *management_interface; - uint32_t cpus = switch_core_cpu_count(); struct in_addr in; memset(&mod_sofia_globals, 0, sizeof(mod_sofia_globals)); @@ -5381,9 +5384,16 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for profiles to start\n"); switch_yield(1500000); - /* start one message thread per cpu */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Starting %u message threads.\n", cpus); - sofia_msg_thread_start(cpus); + mod_sofia_globals.cpu_count = switch_core_cpu_count(); + mod_sofia_globals.max_msg_queues = mod_sofia_globals.cpu_count + 1; + + if (mod_sofia_globals.max_msg_queues > SOFIA_MAX_MSG_QUEUE) { + mod_sofia_globals.max_msg_queues = SOFIA_MAX_MSG_QUEUE; + } + + /* start one message thread */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Starting initial message thread.\n"); + sofia_msg_thread_start(0); if (switch_event_bind_removable(modname, SWITCH_EVENT_CUSTOM, MULTICAST_EVENT, event_handler, NULL, &mod_sofia_globals.custom_node) != SWITCH_STATUS_SUCCESS) { diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.h b/src/mod/endpoints/mod_sofia/mod_sofia.h index 8052d94539..64dfc48c8c 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.h +++ b/src/mod/endpoints/mod_sofia/mod_sofia.h @@ -336,8 +336,8 @@ typedef enum { TFLAG_MAX } TFLAGS; -#define SOFIA_MAX_MSG_QUEUE 101 -#define SOFIA_MSG_QUEUE_SIZE 5000 +#define SOFIA_MAX_MSG_QUEUE 64 +#define SOFIA_MSG_QUEUE_SIZE 100 struct mod_sofia_globals { switch_memory_pool_t *pool; @@ -347,6 +347,8 @@ struct mod_sofia_globals { uint32_t callid; int32_t running; int32_t threads; + int cpu_count; + int max_msg_queues; switch_mutex_t *mutex; char guess_ip[80]; char hostname[512]; diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 1dd6646a32..eb50fda2f2 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1236,8 +1236,25 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj { void *pop; switch_queue_t *q = (switch_queue_t *) obj; + int my_id; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread Started\n"); + for (my_id = 0; my_id < mod_sofia_globals.msg_queue_len; my_id++) { + if (mod_sofia_globals.msg_queue[my_id] == q) { + break; + } + } + + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread %d Started\n", my_id); + +#ifdef HAVE_CPU_SET_MACROS + { + cpu_set_t set; + CPU_ZERO(&set); + CPU_SET(my_id, &set); + sched_setaffinity(0, sizeof(set), &set); + } +#endif while(switch_queue_pop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) { @@ -1251,12 +1268,11 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj return NULL; } -static int IDX = 0; - void sofia_msg_thread_start(int idx) { - if (idx >= SOFIA_MAX_MSG_QUEUE || (idx < mod_sofia_globals.msg_queue_len && mod_sofia_globals.msg_queue_thread[idx])) { + if (idx >= mod_sofia_globals.max_msg_queues || + idx >= SOFIA_MAX_MSG_QUEUE || (idx < mod_sofia_globals.msg_queue_len && mod_sofia_globals.msg_queue_thread[idx])) { return; } @@ -1274,7 +1290,7 @@ void sofia_msg_thread_start(int idx) switch_threadattr_create(&thd_attr, mod_sofia_globals.pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - //switch_threadattr_priority_increase(thd_attr); + switch_threadattr_priority_increase(thd_attr); switch_thread_create(&mod_sofia_globals.msg_queue_thread[i], thd_attr, sofia_msg_thread_run, @@ -1290,31 +1306,32 @@ void sofia_msg_thread_start(int idx) static void sofia_queue_message(sofia_dispatch_event_t *de) { - int idx = 0; + int idx = 0, queued = 0; - if (mod_sofia_globals.running == 0) { + if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue[0]) { sofia_process_dispatch_event(&de); return; } again: - switch_mutex_lock(mod_sofia_globals.mutex); - idx = IDX; - IDX++; - if (IDX >= mod_sofia_globals.msg_queue_len) IDX = 0; - switch_mutex_unlock(mod_sofia_globals.mutex); - - sofia_msg_thread_start(idx); - - if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) != SWITCH_STATUS_SUCCESS) { - if (mod_sofia_globals.msg_queue_len < SOFIA_MAX_MSG_QUEUE) { - sofia_msg_thread_start(idx + 1); - goto again; - } else { - switch_queue_push(mod_sofia_globals.msg_queue[idx], de); + for (idx = 0; idx < mod_sofia_globals.msg_queue_len; idx++) { + if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) == SWITCH_STATUS_SUCCESS) { + queued++; + break; } } + + if (!queued) { + + if (mod_sofia_globals.msg_queue_len < mod_sofia_globals.max_msg_queues) { + sofia_msg_thread_start(mod_sofia_globals.msg_queue_len + 1); + goto again; + } + + switch_queue_push(mod_sofia_globals.msg_queue[0], de); + } + } @@ -1959,6 +1976,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void NUTAG_AUTOACK(0), NUTAG_AUTOALERT(0), NUTAG_ENABLEMESSENGER(1), + NTATAG_EXTRA_100(0), TAG_IF((profile->mflags & MFLAG_REGISTER), NUTAG_ALLOW("REGISTER")), TAG_IF((profile->mflags & MFLAG_REFER), NUTAG_ALLOW("REFER")), TAG_IF(!sofia_test_pflag(profile, PFLAG_DISABLE_100REL), NUTAG_ALLOW("PRACK")), @@ -3652,18 +3670,6 @@ switch_status_t config_sofia(int reload, char *profile_name) mod_sofia_globals.debug_sla = atoi(val); } else if (!strcasecmp(var, "auto-restart")) { mod_sofia_globals.auto_restart = switch_true(val); - } else if (!strcasecmp(var, "message-threads")) { - int num = atoi(val); - - if (num < 1 || num > SOFIA_MAX_MSG_QUEUE - 1) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "message-threads must be between 1 and %d", SOFIA_MAX_MSG_QUEUE -1); - } - - if (num < 1) num = 1; - if (num > SOFIA_MAX_MSG_QUEUE - 1) num = SOFIA_MAX_MSG_QUEUE -1; - - sofia_msg_thread_start(num); - } else if (!strcasecmp(var, "reg-deny-binding-fetch-and-no-lookup")) { /* backwards compatibility */ mod_sofia_globals.reg_deny_binding_fetch_and_no_lookup = switch_true(val); /* remove when noone complains about the extra lookup */ if (switch_true(val)) { @@ -7521,7 +7527,7 @@ void sofia_handle_sip_i_invite(nua_t *nua, sofia_profile_t *profile, nua_handle_ nua_respond(nh, 400, "Missing Contact Header", TAG_END()); goto fail; } - + sofia_glue_get_addr(de->data->e_msg, network_ip, sizeof(network_ip), &network_port); if (sofia_test_pflag(profile, PFLAG_AGGRESSIVE_NAT_DETECTION)) { diff --git a/src/switch_core.c b/src/switch_core.c index adb9338f68..8b93c84301 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1684,7 +1684,7 @@ static void switch_load_core_config(const char *file) } else if (end_of(val) == 'm') { tmp *= (1024 * 1024); } - + if (tmp >= 32000 && tmp < 10500000) { runtime.sql_buffer_len = tmp; } else { diff --git a/src/switch_event.c b/src/switch_event.c index 6595f61191..2203313191 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -63,7 +63,7 @@ struct switch_event_subclass { int bind; }; -#define MAX_DISPATCH_VAL 20 +#define MAX_DISPATCH_VAL 64 static unsigned int MAX_DISPATCH = MAX_DISPATCH_VAL; static unsigned int SOFT_MAX_DISPATCH = 0; static char guess_ip_v4[80] = ""; @@ -254,6 +254,16 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1; switch_mutex_unlock(EVENT_QUEUE_MUTEX); +#ifdef HAVE_CPU_SET_MACROS + { + cpu_set_t set; + CPU_ZERO(&set); + CPU_SET(my_id, &set); + sched_setaffinity(0, sizeof(set), &set); + } +#endif + + for (;;) { void *pop = NULL; switch_event_t *event = NULL; @@ -291,7 +301,6 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi switch_queue_t *queue = (switch_queue_t *) obj; uint32_t index = 0; int my_id = 0; - int auto_pause = 0; switch_mutex_lock(EVENT_QUEUE_MUTEX); THREAD_COUNT++; @@ -306,15 +315,6 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi for (;;) { void *pop = NULL; switch_event_t *event = NULL; - int loops = 0; - - if (auto_pause) { - if (!--auto_pause) { - switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); - } else { - switch_cond_next(); - } - } if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) { break; @@ -332,19 +332,6 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi while (event) { - - if (++loops > 2) { - if (auto_pause) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event system *still* overloading.\n"); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "Event system overloading. Taking a 10 second break\n"); - auto_pause = 10; - switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); - } - switch_yield(1000000); - } - for (index = 0; index < SOFT_MAX_DISPATCH; index++) { if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) { event = NULL; @@ -358,8 +345,8 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); switch_mutex_unlock(EVENT_QUEUE_MUTEX); } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of event dispatch threads! Slowing things down.\n"); - switch_yield(1000000); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of event dispatch threads! Resorting to a blocking push.... Look for laggy event consumers or event_socket connections!\n"); + switch_queue_push(EVENT_DISPATCH_QUEUE[0], event); } } } @@ -484,6 +471,15 @@ SWITCH_DECLARE(switch_status_t) switch_event_reserve_subclass_detailed(const cha SWITCH_DECLARE(void) switch_core_memory_reclaim_events(void) { + int index; + + for (index = 0; index < SOFT_MAX_DISPATCH; index++) { + if (EVENT_DISPATCH_QUEUE[index]) { + printf("%d size: %u\n", index, switch_queue_size(EVENT_DISPATCH_QUEUE[index])); + } + } + + #ifdef SWITCH_EVENT_RECYCLE void *pop; @@ -520,9 +516,11 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) switch_mutex_unlock(EVENT_QUEUE_MUTEX); for (x = 0; x < 3; x++) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x); - switch_queue_trypush(EVENT_QUEUE[x], NULL); - switch_queue_interrupt_all(EVENT_QUEUE[x]); + if (EVENT_QUEUE[x]) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x); + switch_queue_trypush(EVENT_QUEUE[x], NULL); + switch_queue_interrupt_all(EVENT_QUEUE[x]); + } } for (x = 0; x < SOFT_MAX_DISPATCH; x++) { @@ -558,12 +556,15 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) switch_event_t *event = NULL; switch_status_t st; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x); - switch_thread_join(&st, EVENT_QUEUE_THREADS[x]); + if (EVENT_QUEUE_THREADS[x]) { - while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { - event = (switch_event_t *) pop; - switch_event_destroy(&event); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x); + switch_thread_join(&st, EVENT_QUEUE_THREADS[x]); + + while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { + event = (switch_event_t *) pop; + switch_event_destroy(&event); + } } } @@ -608,7 +609,11 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t switch_threadattr_priority_increase(thd_attr); switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool); while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index); + if (index == 1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create additional event dispatch thread %d\n", index); + } launched++; break; } @@ -618,7 +623,7 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) { - switch_threadattr_t *thd_attr;; + //switch_threadattr_t *thd_attr; /* This statement doesn't do anything commenting it out for now. @@ -626,6 +631,9 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) switch_assert(switch_arraylen(EVENT_NAMES) == SWITCH_EVENT_ALL + 1); */ + /* don't need any more dispatch threads than we have CPU's*/ + MAX_DISPATCH = switch_core_cpu_count(); + switch_assert(pool != NULL); THRUNTIME_POOL = RUNTIME_POOL = pool; @@ -640,26 +648,26 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) SYSTEM_RUNNING = -1; switch_mutex_unlock(EVENT_QUEUE_MUTEX); - switch_threadattr_create(&thd_attr, pool); + //switch_threadattr_create(&thd_attr, pool); switch_find_local_ip(guess_ip_v4, sizeof(guess_ip_v4), NULL, AF_INET); switch_find_local_ip(guess_ip_v6, sizeof(guess_ip_v6), NULL, AF_INET6); - switch_queue_create(&EVENT_QUEUE[0], POOL_COUNT_MAX + 10, THRUNTIME_POOL); - switch_queue_create(&EVENT_QUEUE[1], POOL_COUNT_MAX + 10, THRUNTIME_POOL); - switch_queue_create(&EVENT_QUEUE[2], POOL_COUNT_MAX + 10, THRUNTIME_POOL); + //switch_queue_create(&EVENT_QUEUE[0], POOL_COUNT_MAX + 10, THRUNTIME_POOL); + //switch_queue_create(&EVENT_QUEUE[1], POOL_COUNT_MAX + 10, THRUNTIME_POOL); + //switch_queue_create(&EVENT_QUEUE[2], POOL_COUNT_MAX + 10, THRUNTIME_POOL); #ifdef SWITCH_EVENT_RECYCLE switch_queue_create(&EVENT_RECYCLE_QUEUE, 250000, THRUNTIME_POOL); switch_queue_create(&EVENT_HEADER_RECYCLE_QUEUE, 250000, THRUNTIME_POOL); #endif - switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_threadattr_priority_increase(thd_attr); + //switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + //switch_threadattr_priority_increase(thd_attr); launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); - switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); - switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); - switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); + //switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); + //switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); + //switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); while (!THREAD_COUNT) { switch_cond_next(); @@ -1775,8 +1783,6 @@ SWITCH_DECLARE(void) switch_event_prep_for_delivery_detailed(const char *file, c SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, const char *func, int line, switch_event_t **event, void *user_data) { - int index; - switch_assert(BLOCK != NULL); switch_assert(RUNTIME_POOL != NULL); switch_assert(EVENT_QUEUE_MUTEX != NULL); @@ -1792,13 +1798,22 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con (*event)->event_user_data = user_data; } + if (!EVENT_QUEUE_THREADS[(*event)->priority] && (*event)->priority < 3) { + switch_threadattr_t *thd_attr; + + switch_queue_create(&EVENT_QUEUE[(*event)->priority], POOL_COUNT_MAX + 10, THRUNTIME_POOL); + switch_threadattr_create(&thd_attr, THRUNTIME_POOL); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_threadattr_priority_increase(thd_attr); + switch_thread_create(&EVENT_QUEUE_THREADS[(*event)->priority], thd_attr, switch_event_thread, EVENT_QUEUE[(*event)->priority], RUNTIME_POOL); + } + for (;;) { - for (index = (*event)->priority; index < 3; index++) { - if (switch_queue_trypush(EVENT_QUEUE[index], *event) == SWITCH_STATUS_SUCCESS) { - goto end; - } + if (switch_queue_trypush(EVENT_QUEUE[(*event)->priority], *event) == SWITCH_STATUS_SUCCESS) { + goto end; } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event queue is full!\n"); switch_yield(100000); } diff --git a/src/switch_time.c b/src/switch_time.c index 9c9ce509ec..17a0a50675 100644 --- a/src/switch_time.c +++ b/src/switch_time.c @@ -799,8 +799,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime) if (runtime.timer_affinity > -1) { cpu_set_t set; CPU_ZERO(&set); - CPU_SET(0, &set); - sched_setaffinity(runtime.timer_affinity, sizeof(set), &set); + CPU_SET(runtime.timer_affinity, &set); + sched_setaffinity(0, sizeof(set), &set); } #endif