FSCORE-668
This commit is contained in:
parent
e3eff8165e
commit
68d1c32ad1
|
@ -35,6 +35,8 @@
|
|||
#include <switch.h>
|
||||
#include "private/switch_core_pvt.h"
|
||||
|
||||
#define SQLLEN 32768
|
||||
|
||||
static struct {
|
||||
switch_cache_db_handle_t *event_db;
|
||||
switch_queue_t *sql_queue[2];
|
||||
|
@ -537,7 +539,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
|
|||
switch (dbh->type) {
|
||||
default:
|
||||
{
|
||||
status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, 32768, err);
|
||||
status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, SQLLEN, err);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -845,7 +847,7 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand
|
|||
|
||||
|
||||
|
||||
#define SQLLEN 1024 * 1024
|
||||
|
||||
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
|
||||
{
|
||||
void *pop;
|
||||
|
@ -853,13 +855,14 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
|
|||
uint8_t trans = 0, nothing_in_queue = 0;
|
||||
uint32_t target = 100000;
|
||||
switch_size_t len = 0, sql_len = SQLLEN;
|
||||
char *tmp, *sqlbuf = (char *) malloc(sql_len);
|
||||
char *sql;
|
||||
char *sqlbuf = (char *) malloc(sql_len);
|
||||
char *sql = NULL;
|
||||
switch_size_t newlen;
|
||||
int lc = 0;
|
||||
uint32_t loops = 0, sec = 0;
|
||||
uint32_t l1 = 1000;
|
||||
uint32_t sanity = 120;
|
||||
int item_remained = 0;
|
||||
|
||||
switch_assert(sqlbuf);
|
||||
|
||||
|
@ -897,9 +900,16 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
|
|||
continue;
|
||||
}
|
||||
|
||||
if (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
|
||||
//printf("SIZE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
|
||||
|
||||
if (item_remained || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
|
||||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
|
||||
|
||||
if (item_remained) {
|
||||
item_remained = 0;
|
||||
} else {
|
||||
sql = (char *) pop;
|
||||
}
|
||||
|
||||
if (sql) {
|
||||
newlen = strlen(sql) + 2;
|
||||
|
@ -911,20 +921,18 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
|
|||
/* ignore abnormally large strings sql strings as potential buffer overflow */
|
||||
if (newlen < SQLLEN) {
|
||||
itterations++;
|
||||
if (len + newlen > sql_len) {
|
||||
sql_len = len + SQLLEN;
|
||||
if (!(tmp = realloc(sqlbuf, sql_len))) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
|
||||
abort();
|
||||
break;
|
||||
}
|
||||
sqlbuf = tmp;
|
||||
}
|
||||
|
||||
if (len + newlen < sql_len) {
|
||||
sprintf(sqlbuf + len, "%s;\n", sql);
|
||||
len += newlen;
|
||||
|
||||
} else {
|
||||
item_remained = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!item_remained) {
|
||||
free(sql);
|
||||
}
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
|
||||
break;
|
||||
|
@ -934,7 +942,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
|
|||
}
|
||||
|
||||
|
||||
if (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500))) {
|
||||
if ((item_remained || (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500)))) &&
|
||||
(sql_manager.event_db->native_handle.core_db_dbh)) {
|
||||
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue