diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 73a8fc8f17..bc8b403961 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -2431,14 +2431,14 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name, - switch_sql_queue_manager_t **qmp, - uint32_t numq, const char *dsn, - const char *pre_trans_execute, - const char *post_trans_execute, - const char *inner_pre_trans_execute, - const char *inner_post_trans_execute); + switch_sql_queue_manager_t **qmp, + uint32_t numq, const char *dsn, uint32_t max_trans, + const char *pre_trans_execute, + const char *post_trans_execute, + const char *inner_pre_trans_execute, + const char *inner_post_trans_execute); -#define switch_sql_queue_manager_init(_q, _n, _d, _p1, _p2, _ip1, _ip2) switch_sql_queue_manager_init_name(__FILE__, _q, _n, _d, _p1, _p2, _ip1, _ip2) +#define switch_sql_queue_manager_init(_q, _n, _d, _m, _p1, _p2, _ip1, _ip2) switch_sql_queue_manager_init_name(__FILE__, _q, _n, _d, _m, _p1, _p2, _ip1, _ip2) SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm); diff --git a/src/include/switch_types.h b/src/include/switch_types.h index b766d1db40..25e2525e4a 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -213,6 +213,7 @@ SWITCH_BEGIN_EXTERN_C #define SWITCH_BITS_PER_BYTE 8 #define SWITCH_DEFAULT_FILE_BUFFER_LEN 65536 #define SWITCH_DTMF_LOG_LEN 1000 +#define SWITCH_MAX_TRANS 2000 typedef uint8_t switch_byte_t; /*! diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 9f467059fb..4550c5e716 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1638,7 +1638,7 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj if (pop) { sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop; sofia_process_dispatch_event(&de); - switch_os_yield(); + switch_cond_next(); } else { break; } @@ -1959,12 +1959,7 @@ void sofia_event_callback(nua_event_t event, end: - if (profile->pres_type) { - switch_cond_next(); - } else { - switch_os_yield(); - } - + switch_cond_next(); return; } @@ -2532,13 +2527,14 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name); switch_sql_queue_manager_init_name(qname, - &profile->qm, - 1, - profile->odbc_dsn ? profile->odbc_dsn : profile->dbname, - profile->pre_trans_execute, - profile->post_trans_execute, - profile->inner_pre_trans_execute, - profile->inner_post_trans_execute); + &profile->qm, + 1, + profile->odbc_dsn ? profile->odbc_dsn : profile->dbname, + SWITCH_MAX_TRANS, + profile->pre_trans_execute, + profile->post_trans_execute, + profile->inner_pre_trans_execute, + profile->inner_post_trans_execute); switch_sql_queue_manager_start(profile->qm); if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) { diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 728f650bb7..b677829edf 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -1215,6 +1215,7 @@ struct switch_sql_queue_manager { char *inner_pre_trans_execute; char *inner_post_trans_execute; switch_memory_pool_t *pool; + uint32_t max_trans; }; static int qm_wake(switch_sql_queue_manager_t *qm) @@ -1410,12 +1411,12 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name, - switch_sql_queue_manager_t **qmp, - uint32_t numq, const char *dsn, - const char *pre_trans_execute, - const char *post_trans_execute, - const char *inner_pre_trans_execute, - const char *inner_post_trans_execute) + switch_sql_queue_manager_t **qmp, + uint32_t numq, const char *dsn, uint32_t max_trans, + const char *pre_trans_execute, + const char *post_trans_execute, + const char *inner_pre_trans_execute, + const char *inner_post_trans_execute) { switch_memory_pool_t *pool; switch_sql_queue_manager_t *qm; @@ -1430,6 +1431,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n qm->numq = numq; qm->dsn = switch_core_strdup(qm->pool, dsn); qm->name = switch_core_strdup(qm->pool, name); + qm->max_trans = max_trans; switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool); switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool); @@ -1458,6 +1460,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n static uint32_t do_trans(switch_cache_db_handle_t *dbh, switch_queue_t *q, switch_mutex_t *mutex, + uint32_t max, const char *pre_trans_execute, const char *post_trans_execute, const char *inner_pre_trans_execute, @@ -1467,11 +1470,22 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh, void *pop; switch_status_t status; uint32_t ttl = 0; + switch_mutex_t *io_mutex = dbh->io_mutex; if (!switch_queue_size(q)) { return 0; } + if (io_mutex) switch_mutex_lock(io_mutex); + + if (!zstr(pre_trans_execute)) { + switch_cache_db_execute_sql_real(dbh, pre_trans_execute, &errmsg); + if (errmsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", pre_trans_execute, errmsg); + free(errmsg); + } + } + switch(dbh->type) { case SCDB_TYPE_CORE_DB: { @@ -1509,7 +1523,15 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh, } - for(;;) { + if (!zstr(inner_pre_trans_execute)) { + switch_cache_db_execute_sql_real(dbh, inner_pre_trans_execute, &errmsg); + if (errmsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", inner_pre_trans_execute, errmsg); + free(errmsg); + } + } + + while(max == 0 || ttl <= max) { if (mutex) switch_mutex_lock(mutex); status = switch_queue_trypop(q, &pop); if (mutex) switch_mutex_unlock(mutex); @@ -1524,6 +1546,15 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh, if (status != SWITCH_STATUS_SUCCESS) break; } + if (!zstr(inner_post_trans_execute)) { + switch_cache_db_execute_sql_real(dbh, inner_post_trans_execute, &errmsg); + if (errmsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", inner_post_trans_execute, errmsg); + free(errmsg); + } + } + + end: switch(dbh->type) { @@ -1548,6 +1579,15 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh, } + if (!zstr(post_trans_execute)) { + switch_cache_db_execute_sql_real(dbh, post_trans_execute, &errmsg); + if (errmsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", post_trans_execute, errmsg); + free(errmsg); + } + } + + if (io_mutex) switch_mutex_unlock(io_mutex); return ttl; } @@ -1557,7 +1597,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, uint32_t sanity = 120; switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj; - uint32_t i; + uint32_t i, countdown = 0; while (!qm->event_db) { if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db) @@ -1605,17 +1645,23 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, } for (i = 0; i < qm->numq; i++) { - uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], qm->mutex, - qm->pre_trans_execute, - qm->post_trans_execute, - qm->inner_pre_trans_execute, - qm->inner_post_trans_execute); + while(switch_queue_size(qm->sql_queue[i])) { + uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], qm->mutex, qm->max_trans, + qm->pre_trans_execute, + qm->post_trans_execute, + qm->inner_pre_trans_execute, + qm->inner_post_trans_execute); - iterations += written; + iterations += written; - switch_mutex_lock(qm->mutex); - qm->written[i] += written; - switch_mutex_unlock(qm->mutex); + switch_mutex_lock(qm->mutex); + qm->written[i] += written; + switch_mutex_unlock(qm->mutex); + + if (written < qm->max_trans) { + break; + } + } } if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { @@ -1638,12 +1684,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, check: - lc = qm_ttl(qm); + countdown = 40; - if (!lc) { - switch_thread_cond_wait(qm->cond, qm->cond_mutex); - } else if (lc < 2000) { - switch_yield(200000); + while (--countdown && (lc = qm_ttl(qm)) < qm->max_trans / 4) { + if (lc == 0) { + switch_thread_cond_wait(qm->cond, qm->cond_mutex); + break; + } + switch_yield(5000); } } @@ -3047,6 +3095,7 @@ static void switch_core_sqldb_start_thread(void) &sql_manager.qm, 4, dbname, + SWITCH_MAX_TRANS, runtime.core_db_pre_trans_execute, runtime.core_db_post_trans_execute, runtime.core_db_inner_pre_trans_execute,