diff --git a/src/include/switch_core.h b/src/include/switch_core.h index a3898d9d64..f647066c38 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -70,6 +70,12 @@ typedef struct switch_hold_record_s { } switch_hold_record_t; +typedef struct switch_thread_data_s { + switch_thread_start_t func; + void *obj; + int alloc; +} switch_thread_data_t; + #define MESSAGE_STAMP_FFL(_m) _m->_file = __FILE__; _m->_func = __SWITCH_FUNC__; _m->_line = __LINE__ @@ -703,6 +709,7 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request_by_name(_In_ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session); +SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp); SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session); /*! @@ -2418,14 +2425,18 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session); SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name); +SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup); SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup); SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp); -SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp, - uint32_t numq, const char *dsn, - const char *pre_trans_execute, - const char *post_trans_execute, - const char *inner_pre_trans_execute, - const char *inner_post_trans_execute); +SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name, + switch_sql_queue_manager_t **qmp, + uint32_t numq, const char *dsn, + const char *pre_trans_execute, + const char *post_trans_execute, + const char *inner_pre_trans_execute, + const char *inner_post_trans_execute); + +#define switch_switch_sql_queue_manager_init(_q, _d, _p1, _p2, _ip1, _ip2) switch_switch_sql_queue_manager_init_name(__FILE__, _q, _d, _p1, _p2, _ip1, _ip2) SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm); diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.c b/src/mod/endpoints/mod_sofia/mod_sofia.c index 14d501489c..898b5e49ed 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.c +++ b/src/mod/endpoints/mod_sofia/mod_sofia.c @@ -4900,8 +4900,7 @@ static switch_call_cause_t sofia_outgoing_channel(switch_core_session_t *session sql = switch_mprintf("insert into sip_dialogs (uuid,presence_id,presence_data,profile_name,hostname,rcd,call_info_state) " "values ('%q', '%q', '%q', '%q', '%q', %ld, '')", switch_core_session_get_uuid(nsession), switch_str_nil(presence_id), switch_str_nil(presence_data), profile->name, mod_sofia_globals.hostname, (long) now); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - switch_safe_free(sql); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); } #endif diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.h b/src/mod/endpoints/mod_sofia/mod_sofia.h index ecb2d1c149..f01a121608 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.h +++ b/src/mod/endpoints/mod_sofia/mod_sofia.h @@ -241,7 +241,7 @@ typedef enum { PFLAG_DISABLE_HOLD, PFLAG_AUTO_NAT, PFLAG_SIPCOMPACT, - PFLAG_SQL_IN_TRANS, + PFLAG_USE_ME, PFLAG_PRESENCE_PRIVACY, PFLAG_PASS_CALLEE_ID, PFLAG_LOG_AUTH_FAIL, @@ -273,6 +273,7 @@ typedef enum { PFLAG_MWI_USE_REG_CALLID, PFLAG_FIRE_MESSAGE_EVENTS, PFLAG_SEND_DISPLAY_UPDATE, + PFLAG_RUNNING_TRANS, /* No new flags below this line */ PFLAG_MAX } PFLAGS; @@ -632,7 +633,7 @@ struct sofia_profile { char *post_trans_execute; char *inner_pre_trans_execute; char *inner_post_trans_execute; - switch_queue_t *sql_queue; + switch_sql_queue_manager_t *qm; char *acl[SOFIA_MAX_ACL]; char *acl_pass_context[SOFIA_MAX_ACL]; char *acl_fail_context[SOFIA_MAX_ACL]; diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 979e185781..d463345f7c 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1520,6 +1520,28 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run_once(switch_thread_t *thread, void return NULL; } +void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep) +{ + sofia_dispatch_event_t *de = *dep; + switch_memory_pool_t *pool; + sofia_profile_t *profile = (*dep)->profile; + switch_thread_data_t *td; + + switch_core_new_memory_pool(&pool); + + *dep = NULL; + de->pool = pool; + + td = switch_core_alloc(pool, sizeof(*td)); + td->func = sofia_msg_thread_run_once; + td->obj = de; + + switch_mutex_lock(profile->ireg_mutex); + switch_thread_pool_launch_thread(&td); + switch_mutex_unlock(profile->ireg_mutex); +} + +#if 0 void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep) { sofia_dispatch_event_t *de = *dep; @@ -1551,6 +1573,7 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep) sofia_process_dispatch_event(&de); } } +#endif void sofia_process_dispatch_event(sofia_dispatch_event_t **dep) { @@ -2158,153 +2181,63 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread sofia_profile_t *profile = (sofia_profile_t *) obj; uint32_t ireg_loops = profile->ireg_seconds; /* Number of loop iterations done when we haven't checked for registrations */ uint32_t gateway_loops = GATEWAY_SECONDS; /* Number of loop iterations done when we haven't checked for gateways */ - void *pop = NULL; /* queue_pop placeholder */ - switch_size_t sql_len = 1024 * 32; /* length of sqlbuf */ - char *tmp, *sqlbuf = NULL; /* Buffer for SQL statements */ - char *sql = NULL; /* Current SQL statement */ - switch_time_t last_commit; /* Last time we committed stuff to the DB */ - switch_time_t last_check; /* Last time we did the second-resolution loop that checks various stuff */ - switch_size_t len = 0; /* Current length of sqlbuf */ - uint32_t statements = 0; /* Number of statements in the current sql buffer */ - - last_commit = last_check = switch_micro_time_now(); - - if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { - sqlbuf = (char *) malloc(sql_len); - } sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING); - switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool); - - /* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */ - while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) { - - if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { - /* Do we have enough statements or is the timeout expired */ - while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 && - switch_micro_time_now() - last_check < 1000000 && - (statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) { + while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING))) { + + if (profile->watchdog_enabled) { + uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0; + + if (profile->step_timeout) { + step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000); - switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000; - - if (sleepy_time < 1000 || sleepy_time > 1000000) { - sleepy_time = 1000; - } - - if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) { - switch_size_t newlen; - - if (!sql) sql = (char *) pop; - - newlen = strlen(sql) + 2 /* strlen(";\n") */ ; - - if (len + newlen + 10 > sql_len) { - switch_size_t new_mlen = len + newlen + 10 + 10240; - - if (new_mlen < SQLLEN) { - sql_len = new_mlen; - - if (!(tmp = realloc(sqlbuf, sql_len))) { - abort(); - break; - } - sqlbuf = tmp; - } else { - break; - } - } - - sprintf(sqlbuf + len, "%s;\n", sql); - len += newlen; - free(sql); - sql = NULL; - - statements++; + if (step_diff > profile->step_timeout) { + step_fail = 1; } } - /* Execute here */ - last_commit = switch_micro_time_now(); - - if (len) { - //printf("TRANS:\n%s\n", sqlbuf); - switch_mutex_lock(profile->ireg_mutex); - sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL); - //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL); - switch_mutex_unlock(profile->ireg_mutex); - statements = 0; - len = 0; + if (profile->event_timeout) { + event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000); + + if (event_diff > profile->event_timeout) { + event_fail = 1; + } } - - } else { - if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) { - sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex); - free(pop); + + if (step_fail && profile->event_timeout && !event_fail) { + step_fail = 0; + } + + if (event_fail || step_fail) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n" + "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name); + switch_yield(2000000); + watchdog_triggered_abort(); } } - if (switch_micro_time_now() - last_check >= 1000000) { - if (profile->watchdog_enabled) { - uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0; - - if (profile->step_timeout) { - step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000); - if (step_diff > profile->step_timeout) { - step_fail = 1; - } - } - - if (profile->event_timeout) { - event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000); - - if (event_diff > profile->event_timeout) { - event_fail = 1; - } - } - - if (step_fail && profile->event_timeout && !event_fail) { - step_fail = 0; - } - - if (event_fail || step_fail) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n" - "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name); - switch_yield(2000000); - watchdog_triggered_abort(); - } - } - - - if (!sofia_test_pflag(profile, PFLAG_STANDBY)) { - if (++ireg_loops >= IREG_SECONDS) { - time_t now = switch_epoch_time_now(NULL); - sofia_reg_check_expire(profile, now, 0); - ireg_loops = 0; - } - - if (++gateway_loops >= GATEWAY_SECONDS) { - sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL)); - gateway_loops = 0; - } - - sofia_sub_check_gateway(profile, time(NULL)); + if (!sofia_test_pflag(profile, PFLAG_STANDBY)) { + if (++ireg_loops >= IREG_SECONDS) { + time_t now = switch_epoch_time_now(NULL); + sofia_reg_check_expire(profile, now, 0); + ireg_loops = 0; } - last_check = switch_micro_time_now(); + if (++gateway_loops >= GATEWAY_SECONDS) { + sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL)); + gateway_loops = 0; + } + + sofia_sub_check_gateway(profile, time(NULL)); } - } - switch_mutex_lock(profile->ireg_mutex); - while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { - sofia_glue_actually_execute_sql(profile, (char *) pop, NULL); - free(pop); + switch_yield(1000000); + } - switch_mutex_unlock(profile->ireg_mutex); sofia_clear_pflag_locked(profile, PFLAG_WORKER_RUNNING); - switch_safe_free(sqlbuf); return NULL; } @@ -2409,6 +2342,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void int sanity; switch_thread_t *worker_thread; switch_status_t st; + char qname [128] = ""; switch_mutex_lock(mod_sofia_globals.mutex); mod_sofia_globals.threads++; @@ -2596,6 +2530,17 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool); switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool); + switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name); + switch_switch_sql_queue_manager_init_name(qname, + &profile->qm, + 1, + profile->odbc_dsn ? profile->odbc_dsn : profile->dbname, + profile->pre_trans_execute, + profile->post_trans_execute, + profile->inner_pre_trans_execute, + profile->inner_post_trans_execute); + switch_switch_sql_queue_manager_start(profile->qm); + if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) { switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s", (sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : ""); @@ -2682,6 +2627,8 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void switch_mutex_lock(profile->flag_mutex); switch_mutex_unlock(profile->flag_mutex); + switch_switch_sql_queue_manager_stop(profile->qm); + if (switch_event_create(&s_event, SWITCH_EVENT_UNPUBLISH) == SWITCH_STATUS_SUCCESS) { switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s", (sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : ""); @@ -4405,7 +4352,6 @@ switch_status_t config_sofia(int reload, char *profile_name) sofia_set_pflag(profile, PFLAG_SEND_DISPLAY_UPDATE); sofia_set_pflag(profile, PFLAG_MESSAGE_QUERY_ON_FIRST_REGISTER); //sofia_set_pflag(profile, PFLAG_PRESENCE_ON_FIRST_REGISTER); - sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS); profile->shutdown_type = "false"; profile->local_network = "localnet.auto"; @@ -5107,20 +5053,6 @@ switch_status_t config_sofia(int reload, char *profile_name) } else { sofia_clear_pflag(profile, PFLAG_PASS_CALLEE_ID); } - } else if (!strcasecmp(var, "sql-in-transactions")) { - int tmp = atoi(val); - - if (switch_true(val)) { - tmp = 500; - } - - if (tmp > 0) { - profile->trans_timeout = tmp; - sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS); - } else { - sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS); - } - } else if (!strcasecmp(var, "enable-soa")) { if (switch_true(val)) { sofia_set_flag(profile, TFLAG_ENABLE_SOA); @@ -6102,8 +6034,7 @@ static void sofia_handle_sip_r_invite(switch_core_session_t *session, int status switch_str_nil(presence_id), switch_str_nil(presence_data), switch_str_nil(p), (long) now); switch_assert(sql); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - switch_safe_free(sql); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); } } else if (status == 200 && (profile->pres_type)) { @@ -9406,9 +9337,7 @@ void sofia_handle_sip_i_invite(switch_core_session_t *session, nua_t *nua, sofia switch_assert(sql); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - switch_safe_free(sql); - + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); } if (is_nat) { diff --git a/src/mod/endpoints/mod_sofia/sofia_glue.c b/src/mod/endpoints/mod_sofia/sofia_glue.c index 10ce4deba1..0117612f6a 100644 --- a/src/mod/endpoints/mod_sofia/sofia_glue.c +++ b/src/mod/endpoints/mod_sofia/sofia_glue.c @@ -6269,7 +6269,8 @@ int sofia_glue_init_sql(sofia_profile_t *profile) }; switch_cache_db_handle_t *dbh = sofia_glue_get_db_handle(profile); - + char *test2; + if (!dbh) { return 0; } @@ -6283,20 +6284,22 @@ int sofia_glue_init_sql(sofia_profile_t *profile) switch_cache_db_test_reactive(dbh, test_sql, "drop table sip_registrations", reg_sql); - - - if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { - char *test2 = switch_mprintf("%s;%s", test_sql, test_sql); + + test2 = switch_mprintf("%s;%s", test_sql, test_sql); - if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "GREAT SCOTT!!! Cannot execute batched statements!\n" - "If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n"); - sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS); - - } + if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "GREAT SCOTT!!! Cannot execute batched statements!\n" + "If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n"); + + switch_cache_db_release_db_handle(&dbh); free(test2); + free(test_sql); + return 0; } + free(test2); + + free(test_sql); test_sql = switch_mprintf("delete from sip_subscriptions where hostname='%q' and full_to='XXX'", mod_sofia_globals.hostname); @@ -6346,45 +6349,31 @@ int sofia_glue_init_sql(sofia_profile_t *profile) void sofia_glue_execute_sql(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic) { - switch_status_t status = SWITCH_STATUS_FALSE; - char *d_sql = NULL, *sql; + char *sql; switch_assert(sqlp && *sqlp); - sql = *sqlp; + sql = *sqlp; - if (profile->sql_queue) { - if (sql_already_dynamic) { - d_sql = sql; - } else { - d_sql = strdup(sql); - } - - switch_assert(d_sql); - if ((status = switch_queue_trypush(profile->sql_queue, d_sql)) == SWITCH_STATUS_SUCCESS) { - d_sql = NULL; - } - } else if (sql_already_dynamic) { - d_sql = sql; - } - - if (status != SWITCH_STATUS_SUCCESS) { - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - } - - switch_safe_free(d_sql); + switch_switch_sql_queue_manager_push(profile->qm, sql, 0, !sql_already_dynamic); if (sql_already_dynamic) { *sqlp = NULL; } } + void sofia_glue_execute_sql_now(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic) { - sofia_glue_actually_execute_sql(profile, *sqlp, profile->ireg_mutex); + char *sql; + + switch_assert(sqlp && *sqlp); + sql = *sqlp; + + switch_switch_sql_queue_manager_push_confirm(profile->qm, sql, 0, !sql_already_dynamic); + if (sql_already_dynamic) { - switch_safe_free(*sqlp); + *sqlp = NULL; } - *sqlp = NULL; } diff --git a/src/mod/endpoints/mod_sofia/sofia_presence.c b/src/mod/endpoints/mod_sofia/sofia_presence.c index f03584f1c3..42fefe3af8 100644 --- a/src/mod/endpoints/mod_sofia/sofia_presence.c +++ b/src/mod/endpoints/mod_sofia/sofia_presence.c @@ -3619,9 +3619,7 @@ void sofia_presence_handle_sip_i_subscribe(int status, } switch_assert(sql != NULL); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - switch_safe_free(sql); - + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); sstr = switch_mprintf("terminated;reason=noresource"); } else { @@ -4522,8 +4520,7 @@ void sofia_presence_check_subscriptions(sofia_profile_t *profile, time_t now) "sub del sql: %s\n", sql); } - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - switch_safe_free(sql); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); } } diff --git a/src/mod/endpoints/mod_sofia/sofia_reg.c b/src/mod/endpoints/mod_sofia/sofia_reg.c index ca2ff0f5c5..2d54f31bc4 100644 --- a/src/mod/endpoints/mod_sofia/sofia_reg.c +++ b/src/mod/endpoints/mod_sofia/sofia_reg.c @@ -695,7 +695,7 @@ void sofia_reg_expire_call_id(sofia_profile_t *profile, const char *call_id, int switch_safe_free(sql); sql = switch_mprintf("delete from sip_registrations where call_id='%q' %s", call_id, sqlextra); - sofia_glue_execute_sql_now(profile, &sql, SWITCH_FALSE); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); switch_safe_free(sqlextra); switch_safe_free(sql); @@ -705,84 +705,80 @@ void sofia_reg_expire_call_id(sofia_profile_t *profile, const char *call_id, int void sofia_reg_check_expire(sofia_profile_t *profile, time_t now, int reboot) { - char sql[1024]; - - + char *sql; if (now) { - switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires" + sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires" ",user_agent,server_user,server_host,profile_name,network_ip" ",%d from sip_registrations where expires > 0 and expires <= %ld", reboot, (long) now); } else { - switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires" + sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires" ",user_agent,server_user,server_host,profile_name,network_ip" ",%d from sip_registrations where expires > 0", reboot); } sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile); if (now) { - switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'", + sql = switch_mprintf("delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'", (long) now, mod_sofia_globals.hostname); } else { - switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); + sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); } - - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); + if (now) { - switch_snprintfv(sql, sizeof(sql), "select call_id from sip_shared_appearance_dialogs where hostname='%q' " + sql = switch_mprintf("select call_id from sip_shared_appearance_dialogs where hostname='%q' " "and profile_name='%s' and expires <= %ld", mod_sofia_globals.hostname, profile->name, (long) now); sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_sla_dialog_del_callback, profile); - switch_snprintfv(sql, sizeof(sql), "delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld", + sql = switch_mprintf("delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld", mod_sofia_globals.hostname, (long) now); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); } if (now) { - switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'", + sql = switch_mprintf("delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'", (long) now, mod_sofia_globals.hostname); } else { - switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); + sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); } - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); if (now) { - switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'", + sql = switch_mprintf("delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'", (long) now, mod_sofia_globals.hostname); } else { - switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); + sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); } - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - - sofia_presence_check_subscriptions(profile, now); + sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); if (now) { - switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'", + sql = switch_mprintf("delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'", (long) now, mod_sofia_globals.hostname); } else { - switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname); + sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname); } - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); if (now) { if (sofia_test_pflag(profile, PFLAG_ALL_REG_OPTIONS_PING)) { - switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid," + sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid," "expires,user_agent,server_user,server_host,profile_name" " from sip_registrations where hostname='%s' and " "profile_name='%s'", mod_sofia_globals.hostname, profile->name); sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_nat_callback, profile); } else if (sofia_test_pflag(profile, PFLAG_NAT_OPTIONS_PING)) { - switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid," + sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid," "expires,user_agent,server_user,server_host,profile_name" " from sip_registrations where (status like '%%NAT%%' " "or contact like '%%fs_nat=yes%%') and hostname='%s' " @@ -846,37 +842,36 @@ void sofia_reg_check_call_id(sofia_profile_t *profile, const char *call_id) void sofia_reg_check_sync(sofia_profile_t *profile) { - char sql[1024]; + char *sql; - - switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires" + sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires" ",user_agent,server_user,server_host,profile_name,network_ip" " from sip_registrations where expires > 0"); sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile); - switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); - switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); - switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); - switch_snprintfv(sql, sizeof(sql), "delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sql = switch_mprintf("delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); - switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); + sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); } char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const char *host, char *val, switch_size_t len) { struct callback_t cbt = { 0 }; - char sql[512] = ""; + char *sql; if (!user) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n"); @@ -887,10 +882,10 @@ char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const c cbt.len = len; if (host) { - switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')", + sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')", user, host, host); } else { - switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user); + sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user); } @@ -908,7 +903,7 @@ char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const c switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *profile, const char *user, const char *host) { struct callback_t cbt = { 0 }; - char sql[512] = ""; + char *sql; if (!user) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n"); @@ -916,10 +911,10 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *p } if (host) { - switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')", + sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')", user, host, host); } else { - switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user); + sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user); } @@ -932,7 +927,7 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *p switch_console_callback_match_t *sofia_reg_find_reg_url_with_positive_expires_multi(sofia_profile_t *profile, const char *user, const char *host) { struct callback_t cbt = { 0 }; - char sql[512] = ""; + char *sql; if (!user) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n"); @@ -940,10 +935,10 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_with_positive_expires_mu } if (host) { - switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')", + sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')", user, host, host); } else { - switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q'", user); + sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q'", user); } sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_find_reg_with_positive_expires_callback, &cbt); @@ -973,8 +968,7 @@ void sofia_reg_auth_challenge(sofia_profile_t *profile, nua_handle_t *nh, sofia_ (long) switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : DEFAULT_NONCE_TTL), profile->name, mod_sofia_globals.hostname); switch_assert(sql != NULL); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - switch_safe_free(sql); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); auth_str = switch_mprintf("Digest realm=\"%q\", nonce=\"%q\",%s algorithm=MD5, qop=\"auth\"", realm, uuid_str, stale ? " stale=true," : ""); @@ -2802,8 +2796,7 @@ auth_res_t sofia_reg_parse_auth(sofia_profile_t *profile, switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : exptime + 10), ncl, nonce); switch_assert(sql != NULL); - sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex); - switch_safe_free(sql); + sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); if (ret == AUTH_OK) ret = AUTH_RENEWED; diff --git a/src/switch_core_session.c b/src/switch_core_session.c index 4c16fadc51..4148267da6 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -1530,25 +1530,25 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th } if (check_status == SWITCH_STATUS_SUCCESS) { - switch_core_session_t *session = (switch_core_session_t *) pop; - switch_size_t id; + switch_thread_data_t *td = (switch_thread_data_t *) pop; - if (!session) break; + if (!td) break; - id = session->id; - switch_mutex_lock(session_manager.mutex); session_manager.busy++; switch_mutex_unlock(session_manager.mutex); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing session %"SWITCH_SIZE_T_FMT" %s\n", - (long) thread, id, switch_core_session_get_name(session)); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) thread); - switch_core_session_thread(thread, (void *) session); + + td->func(thread, td->obj); + + if (td->alloc) { + free(td); + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing\n", (long) thread); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing session %"SWITCH_SIZE_T_FMT"\n", - (long) thread, id); - switch_mutex_lock(session_manager.mutex); session_manager.busy--; switch_mutex_unlock(session_manager.mutex); @@ -1656,11 +1656,27 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t return NULL; } +SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + switch_thread_data_t *td; + + switch_assert(tdp); + + td = *tdp; + *tdp = NULL; + + switch_queue_push(session_manager.thread_queue, td); + check_queue(); + + return status; +} SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session) { switch_status_t status = SWITCH_STATUS_INUSE; - + switch_thread_data_t *td; + switch_mutex_lock(session->mutex); if (switch_test_flag(session, SSF_THREAD_RUNNING)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot double-launch thread!\n"); @@ -1670,7 +1686,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co status = SWITCH_STATUS_SUCCESS; switch_set_flag(session, SSF_THREAD_RUNNING); switch_set_flag(session, SSF_THREAD_STARTED); - switch_queue_push(session_manager.thread_queue, session); + td = switch_core_session_alloc(session, sizeof(*td)); + td->obj = session; + td->func = switch_core_session_thread; + switch_queue_push(session_manager.thread_queue, td); check_queue(); } switch_mutex_unlock(session->mutex); diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 80ab68c110..2ebfd5b464 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -1214,14 +1214,19 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj); struct switch_sql_queue_manager { + const char *name; switch_cache_db_handle_t *event_db; switch_queue_t **sql_queue; + uint32_t *pre_written; + uint32_t *written; + int *sizes; uint32_t numq; char *dsn; switch_thread_t *thread; int thread_running; switch_thread_cond_t *cond; switch_mutex_t *cond_mutex; + switch_mutex_t *mutex; char *pre_trans_execute; char *post_trans_execute; char *inner_pre_trans_execute; @@ -1229,12 +1234,15 @@ struct switch_sql_queue_manager { switch_memory_pool_t *pool; }; -static void qm_wake(switch_sql_queue_manager_t *qm) +static int qm_wake(switch_sql_queue_manager_t *qm) { if (switch_mutex_trylock(qm->cond_mutex) == SWITCH_STATUS_SUCCESS) { switch_thread_cond_signal(qm->cond); switch_mutex_unlock(qm->cond_mutex); + return 1; } + + return 0; } static uint32_t qm_ttl(switch_sql_queue_manager_t *qm) @@ -1335,15 +1343,59 @@ SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_ } +SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup) +{ + int want, size, x = 0, sanity = 0; + uint32_t written; + + if (!qm->thread_running) { + return SWITCH_STATUS_FALSE; + } + + if (sql_manager.thread_running != 1) { + return SWITCH_STATUS_FALSE; + } + + if (pos > qm->numq - 1) { + pos = 0; + } + + switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql); + + switch_mutex_lock(qm->mutex); + written = qm->written[pos]; + size = qm->sizes[pos]; + want = written + size; + switch_mutex_unlock(qm->mutex); + + qm_wake(qm); + + while((qm->written[pos] < want) || (qm->written[pos] >= written && want < written && qm->written[pos] > want)) { + switch_yield(5000); + + if (++x == 200) { + qm_wake(qm); + x = 0; + if (++sanity == 20) { + break; + } + } + } + + return SWITCH_STATUS_SUCCESS; +} -SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp, - uint32_t numq, const char *dsn, - const char *pre_trans_execute, - const char *post_trans_execute, - const char *inner_pre_trans_execute, - const char *inner_post_trans_execute) + + +SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name, + switch_sql_queue_manager_t **qmp, + uint32_t numq, const char *dsn, + const char *pre_trans_execute, + const char *post_trans_execute, + const char *inner_pre_trans_execute, + const char *inner_post_trans_execute) { switch_memory_pool_t *pool; switch_sql_queue_manager_t *qm; @@ -1357,11 +1409,16 @@ SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_ qm->pool = pool; qm->numq = numq; qm->dsn = switch_core_strdup(qm->pool, dsn); + qm->name = switch_core_strdup(qm->pool, name); switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool); + switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool); switch_thread_cond_create(&qm->cond, qm->pool); qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq); + qm->sizes = switch_core_alloc(qm->pool, sizeof(int) * numq); + qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq); + qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq); for (i = 0; i < qm->numq; i++) { switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool); @@ -1400,13 +1457,13 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, while (!qm->event_db) { if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db) break; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name); switch_yield(500000); sanity--; } if (!qm->event_db) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name); return NULL; } @@ -1431,14 +1488,19 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, while (qm->thread_running == 1) { int proceed = !!save_sql; + int pindex = -1; if (!proceed) { for (i = 0; i < qm->numq; i++) { if (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) { if (sql_manager.thread_running != 1) { - free(pop); - pop = NULL; + if (pop) { + switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL); + free(pop); + pop = NULL; + } } else { + pindex = i; proceed = 1; break; } @@ -1470,7 +1532,8 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { for (i = 0; i < qm->numq; i++) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "REALLOC QUEUE %ld %d %d\n", + "%s REALLOC QUEUE %ld %d %d\n", + qm->name, (long int)sql_len, i, switch_queue_size(qm->sql_queue[i])); @@ -1478,7 +1541,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, } } if (!(tmp = realloc(sqlbuf, sql_len))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name); abort(); break; } @@ -1487,7 +1550,8 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { for (i = 0; i < qm->numq; i++) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "SAVE QUEUE %d %d\n", + "%s SAVE QUEUE %d %d\n", + qm->name, i, switch_queue_size(qm->sql_queue[i])); @@ -1499,6 +1563,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, goto skip; } } + + switch_mutex_lock(qm->mutex); + qm->pre_written[pindex]++; + switch_mutex_unlock(qm->mutex); iterations++; sprintf(sqlbuf + len, "%s;\n", sql); @@ -1506,7 +1574,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, free(sql); sql = NULL; } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name); break; } } @@ -1519,14 +1587,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, auto_pause = 1; switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); auto_pause = 1; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc); } } else { if (auto_pause && lc < 1000) { auto_pause = 0; switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); auto_pause = 0; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name); } } @@ -1535,14 +1603,23 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, wrote = 0; if (trans && iterations && (iterations > target || !lc)) { + if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { + char line[128] = ""; + int l; + + switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name); + for (i = 0; i < qm->numq; i++) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "RUN QUEUE %d %d %d\n", - i, - switch_queue_size(qm->sql_queue[i]), - iterations); + l = strlen(line); + switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i])); } + + l = strlen(line); + switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line); + } if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1, qm->pre_trans_execute, @@ -1550,13 +1627,12 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, qm->inner_pre_trans_execute, qm->inner_post_trans_execute ) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name); } if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name); } - iterations = 0; trans = 0; len = 0; @@ -1572,6 +1648,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, lc = qm_ttl(qm); + switch_mutex_lock(qm->mutex); + for (i = 0; i < qm->numq; i++) { + qm->sizes[i] = switch_queue_size(qm->sql_queue[i]); + qm->written[i] += qm->pre_written[i]; + qm->pre_written[i] = 0; + } + switch_mutex_unlock(qm->mutex); + if (!lc) { switch_thread_cond_wait(qm->cond, qm->cond_mutex); } else if (wrote) { @@ -1587,7 +1671,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, for(i = 0; i < qm->numq; i++) { while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) { - switch_safe_free(pop); + if (pop) { + switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL); + free(pop); + } } }