fifo tweaks

This commit is contained in:
Anthony Minessale 2010-07-21 19:40:13 -05:00
parent d3108020c5
commit 7518c86a93
1 changed files with 53 additions and 52 deletions

View File

@ -1149,10 +1149,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
if (node) { if (node) {
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
//node->busy = switch_epoch_time_now(NULL) + 600; node->busy = 0;
node->ring_consumer_count = 1; node->ring_consumer_count = 1;
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
} else { } else {
goto end; goto end;
} }
@ -1319,7 +1319,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, " char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
"outbound_fail_count=outbound_fail_count+1, " "outbound_fail_count=outbound_fail_count+1, "
"outbound_fail_total_count = outbound_fail_total_count+1, " "outbound_fail_total_count = outbound_fail_total_count+1, "
"next_avail=%ld + lag where uuid='%q' and ring_count > 0", "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
(long) switch_epoch_time_now(NULL), h->uuid); (long) switch_epoch_time_now(NULL), h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql); switch_safe_free(sql);
@ -1389,6 +1389,14 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
cbh->ready = 1; cbh->ready = 1;
if (node) {
switch_thread_rwlock_wrlock(node->rwlock);
node->ring_consumer_count = 0;
node->busy = 0;
switch_thread_rwlock_unlock(node->rwlock);
}
for (i = 0; i < cbh->rowcount; i++) { for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i]; struct call_helper *h = cbh->rows[i];
del_consumer_outbound_call(h->uuid); del_consumer_outbound_call(h->uuid);
@ -1404,14 +1412,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
if (pop_dup) { if (pop_dup) {
switch_event_destroy(&pop_dup); switch_event_destroy(&pop_dup);
} }
if (node) {
switch_mutex_lock(node->mutex);
node->ring_consumer_count = 0;
//node->busy = switch_epoch_time_now(NULL) + connected;
switch_mutex_unlock(node->mutex);
}
switch_core_destroy_memory_pool(&cbh->pool); switch_core_destroy_memory_pool(&cbh->pool);
return NULL; return NULL;
@ -1439,10 +1440,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
if (node) { if (node) {
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
node->ring_consumer_count++; node->ring_consumer_count++;
//node->busy = switch_epoch_time_now(NULL) + 600; node->busy = 0;
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
} }
switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS); switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
@ -1485,7 +1486,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
if (status != SWITCH_STATUS_SUCCESS) { if (status != SWITCH_STATUS_SUCCESS) {
sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, " sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
"outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag where uuid='%q' and use_count > 0", "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q' and use_count > 0",
(long) switch_epoch_time_now(NULL), h->uuid); (long) switch_epoch_time_now(NULL), h->uuid);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql); switch_safe_free(sql);
@ -1539,12 +1540,12 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
switch_event_destroy(&ovars); switch_event_destroy(&ovars);
if (node) { if (node) {
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
if (node->ring_consumer_count-- < 0) { if (node->ring_consumer_count-- < 0) {
node->ring_consumer_count = 0; node->ring_consumer_count = 0;
} }
//node->busy = switch_epoch_time_now(NULL) + connected; node->busy = 0;
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
} }
switch_core_destroy_memory_pool(&h->pool); switch_core_destroy_memory_pool(&h->pool);
@ -1652,15 +1653,10 @@ static void find_consumers(fifo_node_t *node)
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh); fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
if (cbh->rowcount) { if (cbh->rowcount) {
int sanity = 40;
switch_threadattr_create(&thd_attr, cbh->pool); switch_threadattr_create(&thd_attr, cbh->pool);
switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool); switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
while(--sanity > 0 && !cbh->ready) {
switch_yield(100000);
}
} }
} }
@ -1689,8 +1685,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val)) { if ((node = (fifo_node_t *) val)) {
switch_mutex_lock(node->mutex); if (node->has_outbound && node->ready && !node->busy) {
if (node->has_outbound && node->ready) {// && switch_epoch_time_now(NULL) > node->busy) {
ppl_waiting = node_consumer_wait_count(node); ppl_waiting = node_consumer_wait_count(node);
consumer_total = node->consumer_count; consumer_total = node->consumer_count;
idle_consumers = node_idle_consumers(node); idle_consumers = node_idle_consumers(node);
@ -1704,7 +1699,6 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
switch_yield(1000000); switch_yield(1000000);
} }
} }
switch_mutex_unlock(node->mutex);
} }
} }
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
@ -1922,8 +1916,9 @@ static void dec_use_count(switch_channel_t *channel)
if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) { if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
del_bridge_call(outbound_id); del_bridge_call(outbound_id);
sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag where use_count > 0 and uuid='%q'", sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'",
now, now, outbound_id); now, now, outbound_id);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql); switch_safe_free(sql);
} }
@ -1963,9 +1958,15 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
return; return;
} }
if (switch_true(switch_channel_get_variable(channel, "fifo_track_call"))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s trying to double-track call!\n", switch_channel_get_name(channel));
return;
}
add_bridge_call(data); add_bridge_call(data);
switch_channel_set_variable(channel, "fifo_outbound_uuid", data); switch_channel_set_variable(channel, "fifo_outbound_uuid", data);
switch_channel_set_variable(channel, "fifo_track_call", "true");
if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) { if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
col1 = "manual_calls_in_count"; col1 = "manual_calls_in_count";
@ -1978,6 +1979,7 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'", sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'",
(long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data); (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql); switch_safe_free(sql);
@ -2218,7 +2220,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_answer(channel); switch_channel_answer(channel);
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
node->caller_count++; node->caller_count++;
if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) { if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
@ -2257,7 +2259,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_set_variable(channel, "fifo_priority", tmp); switch_channel_set_variable(channel, "fifo_priority", tmp);
} }
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
ts = switch_micro_time_now(); ts = switch_micro_time_now();
switch_time_exp_lt(&tm, ts); switch_time_exp_lt(&tm, ts);
@ -2342,6 +2344,8 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_clear_app_flag(channel, CF_APP_TAGGED); switch_channel_clear_app_flag(channel, CF_APP_TAGGED);
abort: abort:
fifo_caller_del(switch_core_session_get_uuid(session));
if (!aborted && switch_channel_ready(channel)) { if (!aborted && switch_channel_ready(channel)) {
switch_channel_set_state(channel, CS_HIBERNATE); switch_channel_set_state(channel, CS_HIBERNATE);
@ -2361,10 +2365,10 @@ SWITCH_STANDARD_APP(fifo_function)
} }
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
node_remove_uuid(node, uuid); node_remove_uuid(node, uuid);
node->caller_count--; node->caller_count--;
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
send_presence(node); send_presence(node);
check_cancel(node); check_cancel(node);
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
@ -2588,15 +2592,10 @@ SWITCH_STANDARD_APP(fifo_function)
} }
} }
if (pop) {
fifo_caller_del(switch_str_nil(switch_event_get_header(pop, "unique-id")));
}
if (pop && !node_consumer_wait_count(node)) { if (pop && !node_consumer_wait_count(node)) {
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
node->start_waiting = 0; node->start_waiting = 0;
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
} }
} }
@ -2710,9 +2709,9 @@ SWITCH_STANDARD_APP(fifo_function)
const char *arg = switch_channel_get_variable(other_channel, "current_application_data"); const char *arg = switch_channel_get_variable(other_channel, "current_application_data");
switch_caller_extension_t *extension = NULL; switch_caller_extension_t *extension = NULL;
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
node->caller_count--; node->caller_count--;
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
send_presence(node); send_presence(node);
check_cancel(node); check_cancel(node);
@ -2786,6 +2785,8 @@ SWITCH_STANDARD_APP(fifo_function)
sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'", sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'",
switch_epoch_time_now(NULL), outbound_id); switch_epoch_time_now(NULL), outbound_id);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql); switch_safe_free(sql);
} }
@ -2817,7 +2818,7 @@ SWITCH_STANDARD_APP(fifo_function)
sql = switch_mprintf("update fifo_outbound set stop_time=%ld, use_count=use_count-1, " sql = switch_mprintf("update fifo_outbound set stop_time=%ld, use_count=use_count-1, "
"outbound_call_total_count=outbound_call_total_count+1, " "outbound_call_total_count=outbound_call_total_count+1, "
"outbound_call_count=outbound_call_count+1, next_avail=%ld + lag where uuid='%s' and use_count > 0", "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0",
now, now, outbound_id); now, now, outbound_id);
fifo_execute_sql(sql, globals.sql_mutex); fifo_execute_sql(sql, globals.sql_mutex);
@ -2876,9 +2877,9 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_set_variable(other_channel, "fifo_status", "DONE"); switch_channel_set_variable(other_channel, "fifo_status", "DONE");
switch_channel_set_variable(other_channel, "fifo_timestamp", date); switch_channel_set_variable(other_channel, "fifo_timestamp", date);
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
node->caller_count--; node->caller_count--;
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
send_presence(node); send_presence(node);
check_cancel(node); check_cancel(node);
switch_core_session_rwunlock(other_session); switch_core_session_rwunlock(other_session);
@ -3523,9 +3524,9 @@ SWITCH_STANDARD_API(fifo_api_function)
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
node = (fifo_node_t *) val; node = (fifo_node_t *) val;
len = node_consumer_wait_count(node); len = node_consumer_wait_count(node);
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node->caller_count, len); stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node->caller_count, len);
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
x++; x++;
} }
@ -3534,9 +3535,9 @@ SWITCH_STANDARD_API(fifo_api_function)
} }
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
len = node_consumer_wait_count(node); len = node_consumer_wait_count(node);
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len); stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len);
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
} else { } else {
stream->write_function(stream, "none\n"); stream->write_function(stream, "none\n");
} }
@ -3546,9 +3547,9 @@ SWITCH_STANDARD_API(fifo_api_function)
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
node = (fifo_node_t *) val; node = (fifo_node_t *) val;
len = node_consumer_wait_count(node); len = node_consumer_wait_count(node);
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound); stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound);
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
x++; x++;
} }
@ -3557,9 +3558,9 @@ SWITCH_STANDARD_API(fifo_api_function)
} }
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
len = node_consumer_wait_count(node); len = node_consumer_wait_count(node);
switch_mutex_lock(node->mutex); switch_thread_rwlock_wrlock(node->rwlock);
stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound); stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound);
switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock);
} else { } else {
stream->write_function(stream, "none\n"); stream->write_function(stream, "none\n");
} }