fix bottleneck in sql thread
git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@10858 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
parent
3212a526a3
commit
58cae7723f
|
@ -86,7 +86,7 @@ SWITCH_DECLARE(int) switch_core_db_exec(switch_core_db_t *db, const char *sql, s
|
||||||
if (ret == SQLITE_BUSY || ret == SQLITE_LOCKED) {
|
if (ret == SQLITE_BUSY || ret == SQLITE_LOCKED) {
|
||||||
if (sane > 1) {
|
if (sane > 1) {
|
||||||
switch_safe_free(err);
|
switch_safe_free(err);
|
||||||
switch_yield(20000);
|
switch_cond_next();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
static struct {
|
static struct {
|
||||||
switch_core_db_t *db;
|
switch_core_db_t *db;
|
||||||
switch_core_db_t *event_db;
|
switch_core_db_t *event_db;
|
||||||
switch_queue_t *sql_queue;
|
switch_queue_t *sql_queue[2];
|
||||||
switch_memory_pool_t *memory_pool;
|
switch_memory_pool_t *memory_pool;
|
||||||
int thread_running;
|
int thread_running;
|
||||||
} sql_manager;
|
} sql_manager;
|
||||||
|
@ -61,7 +61,7 @@ again:
|
||||||
while (begin_retries > 0) {
|
while (begin_retries > 0) {
|
||||||
again = 0;
|
again = 0;
|
||||||
|
|
||||||
switch_core_db_exec(db, "begin transaction", NULL, NULL, &errmsg);
|
switch_core_db_exec(db, "BEGIN", NULL, NULL, &errmsg);
|
||||||
|
|
||||||
if (errmsg) {
|
if (errmsg) {
|
||||||
begin_retries--;
|
begin_retries--;
|
||||||
|
@ -74,7 +74,7 @@ again:
|
||||||
errmsg = NULL;
|
errmsg = NULL;
|
||||||
|
|
||||||
if (again) {
|
if (again) {
|
||||||
switch_core_db_exec(db, "end transaction", NULL, NULL, NULL);
|
switch_core_db_exec(db, "COMMIT", NULL, NULL, NULL);
|
||||||
goto again;
|
goto again;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ again:
|
||||||
|
|
||||||
done:
|
done:
|
||||||
|
|
||||||
switch_core_db_exec(db, "end transaction", NULL, NULL, NULL);
|
switch_core_db_exec(db, "COMMIT", NULL, NULL, NULL);
|
||||||
|
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
@ -151,7 +151,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread,
|
||||||
void *pop;
|
void *pop;
|
||||||
uint32_t itterations = 0;
|
uint32_t itterations = 0;
|
||||||
uint8_t trans = 0, nothing_in_queue = 0;
|
uint8_t trans = 0, nothing_in_queue = 0;
|
||||||
uint32_t target = 1000;
|
uint32_t target = 50000;
|
||||||
switch_size_t len = 0, sql_len = SQLLEN;
|
switch_size_t len = 0, sql_len = SQLLEN;
|
||||||
char *tmp, *sqlbuf = (char *) malloc(sql_len);
|
char *tmp, *sqlbuf = (char *) malloc(sql_len);
|
||||||
char *sql;
|
char *sql;
|
||||||
|
@ -167,7 +167,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread,
|
||||||
sql_manager.thread_running = 1;
|
sql_manager.thread_running = 1;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (switch_queue_trypop(sql_manager.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) {
|
if (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
|
||||||
|
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
|
||||||
sql = (char *) pop;
|
sql = (char *) pop;
|
||||||
|
|
||||||
if (sql) {
|
if (sql) {
|
||||||
|
@ -203,7 +204,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (trans && ((itterations == target) || nothing_in_queue)) {
|
if (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500))) {
|
||||||
if (switch_core_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 100) != SWITCH_STATUS_SUCCESS) {
|
if (switch_core_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 100) != SWITCH_STATUS_SUCCESS) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
|
||||||
}
|
}
|
||||||
|
@ -212,10 +213,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread,
|
||||||
nothing_in_queue = 0;
|
nothing_in_queue = 0;
|
||||||
len = 0;
|
len = 0;
|
||||||
*sqlbuf = '\0';
|
*sqlbuf = '\0';
|
||||||
}
|
|
||||||
|
|
||||||
if (++lc == 300000) {
|
|
||||||
switch_core_db_exec(sql_manager.db, "vacuum;", NULL, NULL, NULL);
|
|
||||||
lc = 0;
|
lc = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,7 +221,11 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while (switch_queue_trypop(sql_manager.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) {
|
while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
free(pop);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
|
||||||
free(pop);
|
free(pop);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,17 +289,20 @@ static void core_event_handler(switch_event_t *event)
|
||||||
sql =
|
sql =
|
||||||
switch_mprintf
|
switch_mprintf
|
||||||
("update channels set read_codec='%q',read_rate='%q',write_codec='%q',write_rate='%q' where uuid='%q'",
|
("update channels set read_codec='%q',read_rate='%q',write_codec='%q',write_rate='%q' where uuid='%q'",
|
||||||
switch_event_get_header_nil(event, "channel-read-codec-name"), switch_event_get_header_nil(event,
|
switch_event_get_header_nil(event, "channel-read-codec-name"),
|
||||||
"channel-read-codec-rate"),
|
switch_event_get_header_nil(event, "channel-read-codec-rate"),
|
||||||
switch_event_get_header_nil(event, "channel-write-codec-name"), switch_event_get_header_nil(event,
|
switch_event_get_header_nil(event, "channel-write-codec-name"),
|
||||||
"channel-write-codec-rate"),
|
switch_event_get_header_nil(event, "channel-write-codec-rate"),
|
||||||
switch_event_get_header_nil(event, "unique-id"));
|
switch_event_get_header_nil(event, "unique-id"));
|
||||||
break;
|
break;
|
||||||
case SWITCH_EVENT_CHANNEL_EXECUTE:
|
case SWITCH_EVENT_CHANNEL_EXECUTE:
|
||||||
sql = switch_mprintf("update channels set application='%q',application_data='%q' where uuid='%q'",
|
sql = switch_mprintf("update channels set application='%q',application_data='%q' where uuid='%q'",
|
||||||
switch_event_get_header_nil(event, "application"),
|
switch_event_get_header_nil(event, "application"),
|
||||||
switch_event_get_header_nil(event, "application-data"), switch_event_get_header_nil(event, "unique-id")
|
switch_event_get_header_nil(event, "application-data"),
|
||||||
|
switch_event_get_header_nil(event, "unique-id")
|
||||||
|
|
||||||
);
|
);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case SWITCH_EVENT_CHANNEL_STATE:
|
case SWITCH_EVENT_CHANNEL_STATE:
|
||||||
{
|
{
|
||||||
|
@ -384,7 +388,11 @@ static void core_event_handler(switch_event_t *event)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sql) {
|
if (sql) {
|
||||||
switch_queue_push(sql_manager.sql_queue, sql);
|
if (switch_stristr("update channels", sql)) {
|
||||||
|
switch_queue_push(sql_manager.sql_queue[1], sql);
|
||||||
|
} else {
|
||||||
|
switch_queue_push(sql_manager.sql_queue[0], sql);
|
||||||
|
}
|
||||||
sql = NULL;
|
sql = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -442,7 +450,7 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool)
|
||||||
" read_rate VARCHAR(255),\n"
|
" read_rate VARCHAR(255),\n"
|
||||||
" write_codec VARCHAR(255),\n"
|
" write_codec VARCHAR(255),\n"
|
||||||
" write_rate VARCHAR(255)\n"
|
" write_rate VARCHAR(255)\n"
|
||||||
");\n";
|
");\ncreate index uuindex on channels (uuid);\n";
|
||||||
char create_calls_sql[] =
|
char create_calls_sql[] =
|
||||||
"CREATE TABLE calls (\n"
|
"CREATE TABLE calls (\n"
|
||||||
" created VARCHAR(255),\n"
|
" created VARCHAR(255),\n"
|
||||||
|
@ -458,7 +466,9 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool)
|
||||||
" callee_dest_num VARCHAR(255),\n"
|
" callee_dest_num VARCHAR(255),\n"
|
||||||
" callee_chan_name VARCHAR(255),\n"
|
" callee_chan_name VARCHAR(255),\n"
|
||||||
" callee_uuid VARCHAR(255)\n"
|
" callee_uuid VARCHAR(255)\n"
|
||||||
");\n";
|
");\n"
|
||||||
|
"create index eruuindex on calls (caller_uuid);\n"
|
||||||
|
"create index eeuuindex on calls (callee_uuid);\n";
|
||||||
char create_interfaces_sql[] =
|
char create_interfaces_sql[] =
|
||||||
"CREATE TABLE interfaces (\n"
|
"CREATE TABLE interfaces (\n"
|
||||||
" type VARCHAR(255),\n"
|
" type VARCHAR(255),\n"
|
||||||
|
@ -488,6 +498,17 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool)
|
||||||
switch_core_db_test_reactive(sql_manager.db, "select sticky from aliases", "DROP TABLE aliases", create_alias_sql);
|
switch_core_db_test_reactive(sql_manager.db, "select sticky from aliases", "DROP TABLE aliases", create_alias_sql);
|
||||||
switch_core_db_exec(sql_manager.db, "delete from complete where sticky=0", NULL, NULL, NULL);
|
switch_core_db_exec(sql_manager.db, "delete from complete where sticky=0", NULL, NULL, NULL);
|
||||||
switch_core_db_exec(sql_manager.db, "delete from aliases where sticky=0", NULL, NULL, NULL);
|
switch_core_db_exec(sql_manager.db, "delete from aliases where sticky=0", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists alias1 on aliases (alias)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete1 on complete (a1)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete2 on complete (a2)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete3 on complete (a3)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete4 on complete (a4)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete5 on complete (a5)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete6 on complete (a6)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete7 on complete (a7)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete8 on complete (a8)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete9 on complete (a9)", NULL, NULL, NULL);
|
||||||
|
switch_core_db_exec(sql_manager.db, "create index if not exists complete10 on complete (a10)", NULL, NULL, NULL);
|
||||||
switch_core_db_exec(sql_manager.db, create_channels_sql, NULL, NULL, NULL);
|
switch_core_db_exec(sql_manager.db, create_channels_sql, NULL, NULL, NULL);
|
||||||
switch_core_db_exec(sql_manager.db, create_calls_sql, NULL, NULL, NULL);
|
switch_core_db_exec(sql_manager.db, create_calls_sql, NULL, NULL, NULL);
|
||||||
switch_core_db_exec(sql_manager.db, create_interfaces_sql, NULL, NULL, NULL);
|
switch_core_db_exec(sql_manager.db, create_interfaces_sql, NULL, NULL, NULL);
|
||||||
|
@ -497,12 +518,14 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_queue_create(&sql_manager.sql_queue, SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
|
switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
|
||||||
|
switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
|
||||||
|
|
||||||
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
|
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
|
||||||
switch_threadattr_detach_set(thd_attr, 1);
|
switch_threadattr_detach_set(thd_attr, 1);
|
||||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||||
switch_thread_create(&thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
|
switch_thread_create(&thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
|
||||||
|
|
||||||
while (!sql_manager.thread_running) {
|
while (!sql_manager.thread_running) {
|
||||||
switch_yield(10000);
|
switch_yield(10000);
|
||||||
}
|
}
|
||||||
|
@ -510,10 +533,14 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool)
|
||||||
|
|
||||||
void switch_core_sqldb_stop(void)
|
void switch_core_sqldb_stop(void)
|
||||||
{
|
{
|
||||||
switch_queue_push(sql_manager.sql_queue, NULL);
|
switch_queue_push(sql_manager.sql_queue[0], NULL);
|
||||||
|
switch_queue_push(sql_manager.sql_queue[1], NULL);
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
|
||||||
while (switch_queue_size(sql_manager.sql_queue) > 0) {
|
while (switch_queue_size(sql_manager.sql_queue[0]) > 0) {
|
||||||
|
switch_yield(10000);
|
||||||
|
}
|
||||||
|
while (switch_queue_size(sql_manager.sql_queue[1]) > 0) {
|
||||||
switch_yield(10000);
|
switch_yield(10000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue