block any inbound messages when queue is full; add debounce for mwi and pres on register; fix missing detach attr on new mode to process reg in new thread

This commit is contained in:
Anthony Minessale 2012-05-22 10:27:18 -05:00
parent 059ef54feb
commit fb790bc320
4 changed files with 63 additions and 31 deletions

View File

@ -5447,14 +5447,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load)
switch_queue_create(&mod_sofia_globals.presence_queue, SOFIA_QUEUE_SIZE, mod_sofia_globals.pool);
switch_queue_create(&mod_sofia_globals.mwi_queue, SOFIA_QUEUE_SIZE, mod_sofia_globals.pool);
if (config_sofia(0, NULL) != SWITCH_STATUS_SUCCESS) {
mod_sofia_globals.running = 0;
return SWITCH_STATUS_GENERR;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for profiles to start\n");
switch_yield(1500000);
mod_sofia_globals.cpu_count = switch_core_cpu_count();
mod_sofia_globals.max_msg_queues = (mod_sofia_globals.cpu_count / 2) + 1;
if (mod_sofia_globals.max_msg_queues < 2) {
@ -5465,10 +5457,20 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load)
mod_sofia_globals.max_msg_queues = SOFIA_MAX_MSG_QUEUE;
}
switch_queue_create(&mod_sofia_globals.msg_queue, SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues, mod_sofia_globals.pool);
/* start one message thread */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Starting initial message thread.\n");
sofia_msg_thread_start(0);
if (config_sofia(0, NULL) != SWITCH_STATUS_SUCCESS) {
mod_sofia_globals.running = 0;
return SWITCH_STATUS_GENERR;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for profiles to start\n");
switch_yield(1500000);
if (switch_event_bind_removable(modname, SWITCH_EVENT_CUSTOM, MULTICAST_EVENT, event_handler, NULL,
&mod_sofia_globals.custom_node) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");

View File

@ -613,6 +613,7 @@ struct sofia_profile {
sofia_gateway_t *gateways;
//su_home_t *home;
switch_hash_t *chat_hash;
switch_hash_t *mwi_debounce_hash;
//switch_core_db_t *master_db;
switch_thread_rwlock_t *rwlock;
switch_mutex_t *flag_mutex;

View File

@ -1307,6 +1307,7 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
switch_memory_pool_t *pool;
switch_thread_t *thread;
sofia_profile_t *profile = (*dep)->profile;
switch_status_t status;
switch_core_new_memory_pool(&pool);
@ -1316,14 +1317,19 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
switch_mutex_lock(profile->ireg_mutex);
switch_threadattr_create(&thd_attr, de->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread,
thd_attr,
sofia_msg_thread_run_once,
de,
de->pool);
status = switch_thread_create(&thread,
thd_attr,
sofia_msg_thread_run_once,
de,
de->pool);
switch_mutex_unlock(profile->ireg_mutex);
if (status != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot create threads!\n");
sofia_process_dispatch_event(&de);
}
}
void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
@ -1414,11 +1420,6 @@ void sofia_msg_thread_start(int idx)
int i;
mod_sofia_globals.msg_queue_len = idx + 1;
if (!mod_sofia_globals.msg_queue) {
switch_queue_create(&mod_sofia_globals.msg_queue, SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues, mod_sofia_globals.pool);
}
for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) {
if (!mod_sofia_globals.msg_queue_thread[i]) {
switch_threadattr_t *thd_attr = NULL;
@ -1477,8 +1478,14 @@ void sofia_event_callback(nua_event_t event,
tagi_t tags[])
{
sofia_dispatch_event_t *de;
int critical = (((SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues) * 900) / 1000);
if (switch_queue_size(mod_sofia_globals.msg_queue) > critical) {
nua_respond(nh, 503, "System Busy", SIPTAG_RETRY_AFTER_STR("300"), TAG_END());
return;
}
if (sofia_test_pflag(profile, PFLAG_STANDBY)) {
if (event < nua_r_set_params || event > nua_r_authenticate) {
nua_respond(nh, 503, "System Paused", TAG_END());
@ -1502,13 +1509,6 @@ void sofia_event_callback(nua_event_t event,
de->nua = nua_stack_ref(nua);
if (event == nua_i_invite && !sofia_private) {
int critical = (((SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues) * 900) / 1000);
if (switch_queue_size(mod_sofia_globals.msg_queue) > critical) {
nua_respond(nh, 503, "Maximum Calls In Progress", SIPTAG_RETRY_AFTER_STR("300"), TAG_END());
return;
}
if (!(sofia_private = su_alloc(nh->nh_home, sizeof(*sofia_private)))) {
abort();
}
@ -2307,6 +2307,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
sofia_glue_del_profile(profile);
switch_core_hash_destroy(&profile->chat_hash);
switch_core_hash_destroy(&profile->mwi_debounce_hash);
switch_thread_rwlock_unlock(profile->rwlock);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write unlock %s\n", profile->name);
@ -3910,6 +3911,7 @@ switch_status_t config_sofia(int reload, char *profile_name)
profile->dbname = switch_core_strdup(profile->pool, url);
switch_core_hash_init(&profile->chat_hash, profile->pool);
switch_core_hash_init(&profile->mwi_debounce_hash, profile->pool);
switch_thread_rwlock_create(&profile->rwlock, profile->pool);
switch_mutex_init(&profile->flag_mutex, SWITCH_MUTEX_NESTED, profile->pool);
profile->dtmf_duration = 100;

View File

@ -993,6 +993,30 @@ uint32_t sofia_reg_reg_count(sofia_profile_t *profile, const char *user, const c
return atoi(buf);
}
static int debounce_check(sofia_profile_t *profile, const char *user, const char *host)
{
char key[512] = "";
int r = 0;
time_t *last, now = switch_epoch_time_now(NULL);
snprintf(key, sizeof(key), "%s%s", user, host);
if ((last = switch_core_hash_find(profile->mwi_debounce_hash, key))) {
if (now - *last > 30) {
*last = now;
r = 1;
}
} else {
last = switch_core_alloc(profile->pool, sizeof(*last));
*last = now;
switch_core_hash_insert(profile->mwi_debounce_hash, key, last);
r = 1;
}
return r;
}
uint8_t sofia_reg_handle_register(nua_t *nua, sofia_profile_t *profile, nua_handle_t *nh, sip_t const *sip,
sofia_dispatch_event_t *de, sofia_regtype_t regtype, char *key,
uint32_t keylen, switch_event_t **v_event, const char *is_nat)
@ -1668,11 +1692,14 @@ uint8_t sofia_reg_handle_register(nua_t *nua, sofia_profile_t *profile, nua_hand
if (contact) {
if (exptime) {
int debounce_ok = debounce_check(profile, mwi_user, mwi_host);
switch_snprintf(exp_param, sizeof(exp_param), "expires=%ld", exptime);
sip_contact_add_param(nua_handle_home(nh), sip->sip_contact, exp_param);
if (sofia_test_pflag(profile, PFLAG_MESSAGE_QUERY_ON_REGISTER) ||
(reg_count == 1 && sofia_test_pflag(profile, PFLAG_MESSAGE_QUERY_ON_FIRST_REGISTER))) {
if ((sofia_test_pflag(profile, PFLAG_MESSAGE_QUERY_ON_REGISTER) ||
(reg_count == 1 && sofia_test_pflag(profile, PFLAG_MESSAGE_QUERY_ON_FIRST_REGISTER))) && debounce_ok) {
if (switch_event_create(&s_mwi_event, SWITCH_EVENT_MESSAGE_QUERY) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(s_mwi_event, SWITCH_STACK_BOTTOM, "Message-Account", "sip:%s@%s", mwi_user, mwi_host);
switch_event_add_header_string(s_mwi_event, SWITCH_STACK_BOTTOM, "VM-Sofia-Profile", profile->name);
@ -1680,9 +1707,9 @@ uint8_t sofia_reg_handle_register(nua_t *nua, sofia_profile_t *profile, nua_hand
}
}
if (sofia_test_pflag(profile, PFLAG_PRESENCE_ON_REGISTER) ||
if ((sofia_test_pflag(profile, PFLAG_PRESENCE_ON_REGISTER) ||
(reg_count == 1 && sofia_test_pflag(profile, PFLAG_PRESENCE_ON_FIRST_REGISTER))
|| send_pres == 1 || (reg_count == 1 && send_pres == 2)) {
|| send_pres == 1 || (reg_count == 1 && send_pres == 2)) && debounce_ok) {
if (sofia_test_pflag(profile, PFLAG_PRESENCE_PROBE_ON_REGISTER)) {
if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_PROBE) == SWITCH_STATUS_SUCCESS) {