diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index 7c8c82a4c9..ffd7871f82 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -56,6 +56,7 @@ struct fifo_node { switch_thread_rwlock_t *rwlock; switch_memory_pool_t *pool; int has_outbound; + int ready; }; typedef struct fifo_node fifo_node_t; @@ -509,7 +510,8 @@ static void find_consumers(fifo_node_t *node) int need = node_consumer_wait_count(node); char *sql; - sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname " + sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, " + "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname " "from fifo_outbound where (use_count < simo_count) and (next_avail = 0 or next_avail <= %ld) order by outbound_call_count", (long) switch_timestamp(NULL)); switch_assert(sql); @@ -532,7 +534,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o switch_mutex_lock(globals.mutex); for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); - if ((node = (fifo_node_t *) val) && node->has_outbound) { + if ((node = (fifo_node_t *) val) && node->has_outbound && node->ready) { switch_mutex_lock(node->mutex); ppl_waiting = node_consumer_wait_count(node); consumer_total = node->consumer_count; @@ -1503,7 +1505,7 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose); } -#define FIFO_API_SYNTAX "list|list_verbose|count|importance []" +#define FIFO_API_SYNTAX "list|list_verbose|count|importance []|reparse [del_all]" SWITCH_STANDARD_API(fifo_api_function) { int len = 0; @@ -1535,6 +1537,7 @@ SWITCH_STANDARD_API(fifo_api_function) if (!strcasecmp(argv[0], "reparse")) { load_config(1, argv[1] && !strcasecmp(argv[1], "del_all")); + goto done; } if (!strcasecmp(argv[0], "list") || verbose) { @@ -1600,8 +1603,12 @@ SWITCH_STANDARD_API(fifo_api_function) } else { stream->write_function(stream, "none\n"); } + } else { + stream->write_function(stream, "-ERR Usage: %s\n", FIFO_API_SYNTAX); } + done: + switch_mutex_unlock(globals.mutex); return SWITCH_STATUS_SUCCESS; } @@ -1633,6 +1640,7 @@ static switch_status_t load_config(int reload, int del_all) char *odbc_pass = NULL; switch_core_db_t *db; switch_status_t status = SWITCH_STATUS_SUCCESS; + char *sql; gethostname(globals.hostname, sizeof(globals.hostname)); @@ -1703,6 +1711,29 @@ static switch_status_t load_config(int reload, int del_all) } #endif + if (reload) { + switch_hash_index_t *hi; + fifo_node_t *node; + void *val; + for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { + switch_hash_this(hi, NULL, NULL, &val); + if ((node = (fifo_node_t *) val)) { + node->ready = 0; + } + } + } + + if (del_all) { + sql = switch_mprintf("delete from fifo_outbound where hostname='%q'", globals.hostname); + } else { + sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname); + } + + fifo_execute_sql(sql, globals.sql_mutex); + switch_safe_free(sql); + + + if ((fifos = switch_xml_child(cfg, "fifos"))) { for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) { const char *name; @@ -1712,10 +1743,9 @@ static switch_status_t load_config(int reload, int del_all) int timeout_i = 60; int lag_i = 10; fifo_node_t *node; - char *sql; name = switch_xml_attr(fifo, "name"); - + if ((importance = switch_xml_attr(fifo, "importance"))) { if ((imp = atoi(importance)) < 0) { imp = 0; @@ -1732,15 +1762,8 @@ static switch_status_t load_config(int reload, int del_all) node = create_node(name, imp); } switch_mutex_unlock(globals.mutex); - - if (del_all) { - sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and hostname='%q'", node->name, globals.hostname); - } else { - sql = switch_mprintf("delete from fifo_outbound where static=1 and fifo_name='%q' and hostname='%q'", node->name, globals.hostname); - } - - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + + switch_assert(node); switch_mutex_lock(node->mutex); @@ -1767,11 +1790,13 @@ static switch_status_t load_config(int reload, int del_all) lag_i = 10; } } - + + sql = switch_mprintf("insert into fifo_outbound " - "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname) " + "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, " + "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname) " "values ('%q','%q','%q',%d,%d,%d,%d,0,0,1,0,0,'%q')", - digest, node->name, member->txt, simo_i, 0, timeout_i, lag_i, 0, 0, globals.hostname + digest, node->name, member->txt, simo_i, 0, timeout_i, lag_i, globals.hostname ); switch_assert(sql); fifo_execute_sql(sql, globals.sql_mutex); @@ -1780,9 +1805,10 @@ static switch_status_t load_config(int reload, int del_all) node->has_outbound = 1; } + node->ready = 1; switch_mutex_unlock(node->mutex); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s started\n", node->name); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s configured\n", node->name); } } @@ -1790,6 +1816,39 @@ static switch_status_t load_config(int reload, int del_all) done: + if (reload) { + switch_hash_index_t *hi; + void *val, *pop; + fifo_node_t *node; + for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { + int x = 0; + switch_hash_this(hi, NULL, NULL, &val); + if (!(node = (fifo_node_t *) val) || node->ready) { + continue; + } + + + if (node_consumer_wait_count(node) || node->consumer_count || node_idle_consumers(node)) { + node->ready = 1; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s not removed, still in use.\n", node->name); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); + switch_thread_rwlock_wrlock(node->rwlock); + for (x = 0; x < MAX_PRI; x++) { + while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) { + free(pop); + } + } + + switch_core_hash_destroy(&node->caller_hash); + switch_core_hash_destroy(&node->consumer_hash); + switch_thread_rwlock_unlock(node->rwlock); + } + } + } + + + return status; }