let mod_fifo use sql_queue_manager

This commit is contained in:
Anthony Minessale 2012-11-28 23:11:31 -06:00
parent 827b680afc
commit f1a89fb016

View File

@ -594,6 +594,11 @@ static struct {
switch_thread_t *node_thread; switch_thread_t *node_thread;
int debug; int debug;
struct fifo_node *nodes; struct fifo_node *nodes;
char *pre_trans_execute;
char *post_trans_execute;
char *inner_pre_trans_execute;
char *inner_post_trans_execute;
switch_sql_queue_manager_t *qm;
} globals; } globals;
@ -742,7 +747,29 @@ switch_cache_db_handle_t *fifo_get_db_handle(void)
return dbh; return dbh;
} }
static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_already_dynamic, switch_bool_t block)
{
int index = 1;
char *sql;
switch_assert(sqlp && *sqlp);
sql = *sqlp;
if (switch_stristr("insert", sql)) {
index = 0;
}
switch_sql_queue_manager_push(globals.qm, sql, index, !sql_already_dynamic);
if (sql_already_dynamic) {
*sqlp = NULL;
}
return SWITCH_STATUS_SUCCESS;
}
#if 0
static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex) static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
{ {
switch_cache_db_handle_t *dbh = NULL; switch_cache_db_handle_t *dbh = NULL;
@ -771,6 +798,7 @@ static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
return status; return status;
} }
#endif
static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata) static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata)
{ {
@ -937,9 +965,7 @@ static void do_unbridge(switch_core_session_t *consumer_session, switch_core_ses
switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session)); sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session));
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING"); switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING");
@ -1025,8 +1051,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
switch_str_nil(msg->string_array_arg[0]), switch_str_nil(msg->string_array_arg[0]),
switch_str_nil(msg->string_array_arg[1]), switch_str_nil(msg->string_array_arg[1]),
switch_core_session_get_uuid(session)); switch_core_session_get_uuid(session));
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
goto end; goto end;
default: default:
goto end; goto end;
@ -1124,8 +1149,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
); );
} }
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
epoch_start = (long)switch_epoch_time_now(NULL); epoch_start = (long)switch_epoch_time_now(NULL);
@ -1381,8 +1405,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
struct call_helper *h = cbh->rows[i]; struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid); char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
} }
@ -1424,8 +1447,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
struct call_helper *h = cbh->rows[i]; struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 " char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 "
"where uuid='%q' and ring_count > 0", h->uuid); "where uuid='%q' and ring_count > 0", h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
} }
} }
@ -1439,8 +1461,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
"outbound_fail_total_count = outbound_fail_total_count+1, " "outbound_fail_total_count = outbound_fail_total_count+1, "
"next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0", "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
(long) switch_epoch_time_now(NULL), h->uuid); (long) switch_epoch_time_now(NULL), h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
} }
} }
@ -1497,8 +1518,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
for (i = 0; i < cbh->rowcount; i++) { for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i]; struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid); char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
} }
end: end:
@ -1608,8 +1628,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid); sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL); status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL);
free(originate_string); free(originate_string);
@ -1619,8 +1638,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, " sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
"outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'", "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'",
(long) switch_epoch_time_now(NULL), h->uuid); (long) switch_epoch_time_now(NULL), h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
@ -2123,15 +2141,12 @@ static void dec_use_count(switch_core_session_t *session, switch_bool_t send_eve
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session)); sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
del_bridge_call(outbound_id); del_bridge_call(outbound_id);
sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'", sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'",
now, now, outbound_id); now, now, outbound_id);
fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
} }
if (send_event) { if (send_event) {
@ -2198,9 +2213,7 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'", sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'",
(long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data); (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) { if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) {
@ -2235,8 +2248,7 @@ static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session)
switch_str_nil(switch_channel_get_variable(channel, "caller_id_number")), switch_str_nil(switch_channel_get_variable(channel, "caller_id_number")),
switch_epoch_time_now(NULL)); switch_epoch_time_now(NULL));
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
} }
static void fifo_caller_del(const char *uuid) static void fifo_caller_del(const char *uuid)
@ -2249,8 +2261,7 @@ static void fifo_caller_del(const char *uuid)
sql = switch_mprintf("delete from fifo_callers"); sql = switch_mprintf("delete from fifo_callers");
} }
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
} }
@ -3018,8 +3029,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_epoch_time_now(NULL), outbound_id); switch_epoch_time_now(NULL), outbound_id);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
} }
add_bridge_call(switch_core_session_get_uuid(other_session)); add_bridge_call(switch_core_session_get_uuid(other_session));
@ -3038,8 +3048,7 @@ SWITCH_STANDARD_APP(fifo_function)
); );
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session)); switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session));
@ -3055,8 +3064,7 @@ SWITCH_STANDARD_APP(fifo_function)
"outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0", "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0",
now, now, outbound_id); now, now, outbound_id);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
del_bridge_call(outbound_id); del_bridge_call(outbound_id);
@ -3088,8 +3096,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_set_variable_printf(other_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start); switch_channel_set_variable_printf(other_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session)); sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_safe_free(sql);
switch_core_media_bug_pause(session); switch_core_media_bug_pause(session);
@ -4009,6 +4016,14 @@ static switch_status_t load_config(int reload, int del_all)
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
} }
} else if (!strcasecmp(var, "db-pre-trans-execute") && !zstr(val)) {
globals.pre_trans_execute = switch_core_strdup(globals.pool, val);
} else if (!strcasecmp(var, "db-post-trans-execute") && !zstr(val)) {
globals.post_trans_execute = switch_core_strdup(globals.pool, val);
} else if (!strcasecmp(var, "db-inner-pre-trans-execute") && !zstr(val)) {
globals.inner_pre_trans_execute = switch_core_strdup(globals.pool, val);
} else if (!strcasecmp(var, "db-inner-post-trans-execute") && !zstr(val)) {
globals.inner_post_trans_execute = switch_core_strdup(globals.pool, val);
} else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) { } else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) {
delete_all_outbound_member_on_startup = switch_true(val); delete_all_outbound_member_on_startup = switch_true(val);
} }
@ -4019,6 +4034,18 @@ static switch_status_t load_config(int reload, int del_all)
globals.dbname = "fifo"; globals.dbname = "fifo";
} }
switch_sql_queue_manager_init_name("fifo",
&globals.qm,
2,
globals.odbc_dsn ? globals.odbc_dsn : globals.dbname,
SWITCH_MAX_TRANS,
globals.pre_trans_execute,
globals.post_trans_execute,
globals.inner_pre_trans_execute,
globals.inner_post_trans_execute);
switch_sql_queue_manager_start(globals.qm);
if (!(dbh = fifo_get_db_handle())) { if (!(dbh = fifo_get_db_handle())) {
@ -4036,8 +4063,8 @@ static switch_status_t load_config(int reload, int del_all)
switch_cache_db_release_db_handle(&dbh); switch_cache_db_release_db_handle(&dbh);
if (!reload) { if (!reload) {
fifo_execute_sql("update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0," char *sql= "update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0 where static=0";
"outbound_call_count=0,outbound_fail_count=0 where static=0", globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_FALSE, SWITCH_TRUE);
} }
if (reload) { if (reload) {
@ -4060,8 +4087,7 @@ static switch_status_t load_config(int reload, int del_all)
sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname); sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname);
} }
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_safe_free(sql);
if (!(node = switch_core_hash_find(globals.fifo_hash, MANUAL_QUEUE_NAME))) { if (!(node = switch_core_hash_find(globals.fifo_hash, MANUAL_QUEUE_NAME))) {
node = create_node(MANUAL_QUEUE_NAME, 0, globals.sql_mutex); node = create_node(MANUAL_QUEUE_NAME, 0, globals.sql_mutex);
@ -4214,8 +4240,7 @@ static switch_status_t load_config(int reload, int del_all)
(long) switch_epoch_time_now(NULL)); (long) switch_epoch_time_now(NULL));
switch_assert(sql); switch_assert(sql);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
free(sql);
free(name_dup); free(name_dup);
node->has_outbound = 1; node->has_outbound = 1;
node->member_count++; node->member_count++;
@ -4270,8 +4295,7 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest); sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest);
switch_assert(sql); switch_assert(sql);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
free(sql);
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
@ -4293,8 +4317,7 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long) expires, globals.hostname, taking_calls, digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long) expires, globals.hostname, taking_calls,
(long)switch_epoch_time_now(NULL)); (long)switch_epoch_time_now(NULL));
switch_assert(sql); switch_assert(sql);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
free(sql);
free(name_dup); free(name_dup);
cbt.buf = outbound_count; cbt.buf = outbound_count;
@ -4329,8 +4352,7 @@ static void fifo_member_del(char *fifo_name, char *originate_string)
sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname); sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname);
switch_assert(sql); switch_assert(sql);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
free(sql);
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) { if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {