finish off sql optimization

This commit is contained in:
Anthony Minessale 2012-10-27 12:27:00 -05:00
parent 60f7849cbe
commit 96550e893e
4 changed files with 89 additions and 43 deletions

View File

@ -2431,14 +2431,14 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name, SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name,
switch_sql_queue_manager_t **qmp, switch_sql_queue_manager_t **qmp,
uint32_t numq, const char *dsn, uint32_t numq, const char *dsn, uint32_t max_trans,
const char *pre_trans_execute, const char *pre_trans_execute,
const char *post_trans_execute, const char *post_trans_execute,
const char *inner_pre_trans_execute, const char *inner_pre_trans_execute,
const char *inner_post_trans_execute); const char *inner_post_trans_execute);
#define switch_sql_queue_manager_init(_q, _n, _d, _p1, _p2, _ip1, _ip2) switch_sql_queue_manager_init_name(__FILE__, _q, _n, _d, _p1, _p2, _ip1, _ip2) #define switch_sql_queue_manager_init(_q, _n, _d, _m, _p1, _p2, _ip1, _ip2) switch_sql_queue_manager_init_name(__FILE__, _q, _n, _d, _m, _p1, _p2, _ip1, _ip2)
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);

View File

@ -213,6 +213,7 @@ SWITCH_BEGIN_EXTERN_C
#define SWITCH_BITS_PER_BYTE 8 #define SWITCH_BITS_PER_BYTE 8
#define SWITCH_DEFAULT_FILE_BUFFER_LEN 65536 #define SWITCH_DEFAULT_FILE_BUFFER_LEN 65536
#define SWITCH_DTMF_LOG_LEN 1000 #define SWITCH_DTMF_LOG_LEN 1000
#define SWITCH_MAX_TRANS 2000
typedef uint8_t switch_byte_t; typedef uint8_t switch_byte_t;
/*! /*!

View File

@ -1638,7 +1638,7 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj
if (pop) { if (pop) {
sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop; sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
sofia_process_dispatch_event(&de); sofia_process_dispatch_event(&de);
switch_os_yield(); switch_cond_next();
} else { } else {
break; break;
} }
@ -1959,12 +1959,7 @@ void sofia_event_callback(nua_event_t event,
end: end:
if (profile->pres_type) { switch_cond_next();
switch_cond_next();
} else {
switch_os_yield();
}
return; return;
} }
@ -2532,13 +2527,14 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name); switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
switch_sql_queue_manager_init_name(qname, switch_sql_queue_manager_init_name(qname,
&profile->qm, &profile->qm,
1, 1,
profile->odbc_dsn ? profile->odbc_dsn : profile->dbname, profile->odbc_dsn ? profile->odbc_dsn : profile->dbname,
profile->pre_trans_execute, SWITCH_MAX_TRANS,
profile->post_trans_execute, profile->pre_trans_execute,
profile->inner_pre_trans_execute, profile->post_trans_execute,
profile->inner_post_trans_execute); profile->inner_pre_trans_execute,
profile->inner_post_trans_execute);
switch_sql_queue_manager_start(profile->qm); switch_sql_queue_manager_start(profile->qm);
if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) { if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) {

View File

@ -1215,6 +1215,7 @@ struct switch_sql_queue_manager {
char *inner_pre_trans_execute; char *inner_pre_trans_execute;
char *inner_post_trans_execute; char *inner_post_trans_execute;
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
uint32_t max_trans;
}; };
static int qm_wake(switch_sql_queue_manager_t *qm) static int qm_wake(switch_sql_queue_manager_t *qm)
@ -1410,12 +1411,12 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name, SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name,
switch_sql_queue_manager_t **qmp, switch_sql_queue_manager_t **qmp,
uint32_t numq, const char *dsn, uint32_t numq, const char *dsn, uint32_t max_trans,
const char *pre_trans_execute, const char *pre_trans_execute,
const char *post_trans_execute, const char *post_trans_execute,
const char *inner_pre_trans_execute, const char *inner_pre_trans_execute,
const char *inner_post_trans_execute) const char *inner_post_trans_execute)
{ {
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
switch_sql_queue_manager_t *qm; switch_sql_queue_manager_t *qm;
@ -1430,6 +1431,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
qm->numq = numq; qm->numq = numq;
qm->dsn = switch_core_strdup(qm->pool, dsn); qm->dsn = switch_core_strdup(qm->pool, dsn);
qm->name = switch_core_strdup(qm->pool, name); qm->name = switch_core_strdup(qm->pool, name);
qm->max_trans = max_trans;
switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool); switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool); switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
@ -1458,6 +1460,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
static uint32_t do_trans(switch_cache_db_handle_t *dbh, static uint32_t do_trans(switch_cache_db_handle_t *dbh,
switch_queue_t *q, switch_queue_t *q,
switch_mutex_t *mutex, switch_mutex_t *mutex,
uint32_t max,
const char *pre_trans_execute, const char *pre_trans_execute,
const char *post_trans_execute, const char *post_trans_execute,
const char *inner_pre_trans_execute, const char *inner_pre_trans_execute,
@ -1467,11 +1470,22 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh,
void *pop; void *pop;
switch_status_t status; switch_status_t status;
uint32_t ttl = 0; uint32_t ttl = 0;
switch_mutex_t *io_mutex = dbh->io_mutex;
if (!switch_queue_size(q)) { if (!switch_queue_size(q)) {
return 0; return 0;
} }
if (io_mutex) switch_mutex_lock(io_mutex);
if (!zstr(pre_trans_execute)) {
switch_cache_db_execute_sql_real(dbh, pre_trans_execute, &errmsg);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", pre_trans_execute, errmsg);
free(errmsg);
}
}
switch(dbh->type) { switch(dbh->type) {
case SCDB_TYPE_CORE_DB: case SCDB_TYPE_CORE_DB:
{ {
@ -1509,7 +1523,15 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh,
} }
for(;;) { if (!zstr(inner_pre_trans_execute)) {
switch_cache_db_execute_sql_real(dbh, inner_pre_trans_execute, &errmsg);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", inner_pre_trans_execute, errmsg);
free(errmsg);
}
}
while(max == 0 || ttl <= max) {
if (mutex) switch_mutex_lock(mutex); if (mutex) switch_mutex_lock(mutex);
status = switch_queue_trypop(q, &pop); status = switch_queue_trypop(q, &pop);
if (mutex) switch_mutex_unlock(mutex); if (mutex) switch_mutex_unlock(mutex);
@ -1524,6 +1546,15 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh,
if (status != SWITCH_STATUS_SUCCESS) break; if (status != SWITCH_STATUS_SUCCESS) break;
} }
if (!zstr(inner_post_trans_execute)) {
switch_cache_db_execute_sql_real(dbh, inner_post_trans_execute, &errmsg);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", inner_post_trans_execute, errmsg);
free(errmsg);
}
}
end: end:
switch(dbh->type) { switch(dbh->type) {
@ -1548,6 +1579,15 @@ static uint32_t do_trans(switch_cache_db_handle_t *dbh,
} }
if (!zstr(post_trans_execute)) {
switch_cache_db_execute_sql_real(dbh, post_trans_execute, &errmsg);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", post_trans_execute, errmsg);
free(errmsg);
}
}
if (io_mutex) switch_mutex_unlock(io_mutex);
return ttl; return ttl;
} }
@ -1557,7 +1597,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
uint32_t sanity = 120; uint32_t sanity = 120;
switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj; switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
uint32_t i; uint32_t i, countdown = 0;
while (!qm->event_db) { while (!qm->event_db) {
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db) if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
@ -1605,17 +1645,23 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
} }
for (i = 0; i < qm->numq; i++) { for (i = 0; i < qm->numq; i++) {
uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], qm->mutex, while(switch_queue_size(qm->sql_queue[i])) {
qm->pre_trans_execute, uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], qm->mutex, qm->max_trans,
qm->post_trans_execute, qm->pre_trans_execute,
qm->inner_pre_trans_execute, qm->post_trans_execute,
qm->inner_post_trans_execute); qm->inner_pre_trans_execute,
qm->inner_post_trans_execute);
iterations += written; iterations += written;
switch_mutex_lock(qm->mutex); switch_mutex_lock(qm->mutex);
qm->written[i] += written; qm->written[i] += written;
switch_mutex_unlock(qm->mutex); switch_mutex_unlock(qm->mutex);
if (written < qm->max_trans) {
break;
}
}
} }
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
@ -1638,12 +1684,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
check: check:
lc = qm_ttl(qm); countdown = 40;
if (!lc) { while (--countdown && (lc = qm_ttl(qm)) < qm->max_trans / 4) {
switch_thread_cond_wait(qm->cond, qm->cond_mutex); if (lc == 0) {
} else if (lc < 2000) { switch_thread_cond_wait(qm->cond, qm->cond_mutex);
switch_yield(200000); break;
}
switch_yield(5000);
} }
} }
@ -3047,6 +3095,7 @@ static void switch_core_sqldb_start_thread(void)
&sql_manager.qm, &sql_manager.qm,
4, 4,
dbname, dbname,
SWITCH_MAX_TRANS,
runtime.core_db_pre_trans_execute, runtime.core_db_pre_trans_execute,
runtime.core_db_post_trans_execute, runtime.core_db_post_trans_execute,
runtime.core_db_inner_pre_trans_execute, runtime.core_db_inner_pre_trans_execute,