diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.h b/src/mod/endpoints/mod_sofia/mod_sofia.h index 660de3da34..d44a7bc947 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.h +++ b/src/mod/endpoints/mod_sofia/mod_sofia.h @@ -198,6 +198,7 @@ typedef enum { PFLAG_MANUAL_REDIRECT, PFLAG_AUTO_NAT, PFLAG_SIPCOMPACT, + PFLAG_SQL_IN_TRANS, /* No new flags below this line */ PFLAG_MAX } PFLAGS; diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 5d5832dce8..89cb6ce963 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -881,6 +881,7 @@ end: } } +#define SQLLEN 1024 * 64 void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj) { sofia_profile_t *profile = (sofia_profile_t *) obj; @@ -889,7 +890,14 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread int loops = 0; uint32_t qsize; void *pop; - + int loop_count = 0; + switch_size_t sql_len = SQLLEN; + char *tmp, *sqlbuf = NULL; + + if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { + sqlbuf = (char *) malloc(sql_len); + } + ireg_loops = IREG_SECONDS; gateway_loops = GATEWAY_SECONDS; @@ -900,15 +908,57 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread qsize = switch_queue_size(profile->sql_queue); while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) { - if (qsize) { - switch_mutex_lock(profile->ireg_mutex); - while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { - sofia_glue_actually_execute_sql(profile, SWITCH_TRUE, (char *) pop, NULL); - free(pop); - } - switch_mutex_unlock(profile->ireg_mutex); - } + if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { + if (qsize > 0 && (qsize >= 500 || ++loop_count >= 500)) { + switch_size_t newlen; + uint32_t itterations = 0; + switch_size_t len = 0; + switch_mutex_lock(profile->ireg_mutex); + + sprintf(sqlbuf + len, "begin;\n"); + len += 7; + + while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + char *sql = (char *) pop; + + newlen = strlen(sql) + 2; + + if (newlen + 10 < SQLLEN) { + itterations++; + if (len + newlen + 10 > sql_len) { + sql_len = len + 10 + SQLLEN; + if (!(tmp = realloc(sqlbuf, sql_len))) { + abort(); + break; + } + sqlbuf = tmp; + } + sprintf(sqlbuf + len, "%s;\n", sql); + len += newlen; + } + + free(pop); + } + + sprintf(sqlbuf + len, "commit;\n"); + + //printf("TRANS:\n%s\n", sqlbuf); + sofia_glue_actually_execute_sql(profile, SWITCH_TRUE, sqlbuf, NULL); + switch_mutex_unlock(profile->ireg_mutex); + loop_count = 0; + } + } else { + if (qsize) { + switch_mutex_lock(profile->ireg_mutex); + while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + sofia_glue_actually_execute_sql(profile, SWITCH_TRUE, (char *) pop, NULL); + free(pop); + } + switch_mutex_unlock(profile->ireg_mutex); + } + } + if (++loops >= 100) { if (++ireg_loops >= IREG_SECONDS) { sofia_reg_check_expire(profile, switch_epoch_time_now(NULL), 0); @@ -935,6 +985,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread switch_mutex_unlock(profile->ireg_mutex); sofia_clear_pflag_locked(profile, PFLAG_WORKER_RUNNING); + switch_safe_free(sqlbuf); return NULL; } @@ -2718,6 +2769,12 @@ switch_status_t config_sofia(int reload, char *profile_name) if (switch_true(val)) { sofia_set_pflag(profile, PFLAG_SIPCOMPACT); } + } else if (!strcasecmp(var, "sql-in-transactions")) { + if (switch_true(val)) { + sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS); + } else { + sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS); + } } else if (!strcasecmp(var, "bitpacking")) { if (!strcasecmp(val, "aal2")) { profile->codec_flags = SWITCH_CODEC_FLAG_AAL2;