FS-5975 --resolve
This commit is contained in:
parent
a75a0ab919
commit
e398ede28b
|
@ -176,66 +176,58 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff *
|
|||
ei_x_encode_atom(rbuf, "error");
|
||||
ei_x_encode_atom(rbuf, "badarg");
|
||||
} else {
|
||||
/* TODO - maybe use a rwlock instead */
|
||||
if ((p = switch_core_hash_find_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex))) {
|
||||
/* try to lock the mutex, so no other responder can */
|
||||
if (switch_mutex_trylock(p->mutex) == SWITCH_STATUS_SUCCESS) {
|
||||
if (p->state == reply_waiting) {
|
||||
/* alright, we've got the lock and we're the first to reply */
|
||||
|
||||
/* reply mutex is locked */
|
||||
if ((p = find_fetch_reply(uuid_str))) {
|
||||
switch (p->state) {
|
||||
case reply_waiting:
|
||||
{
|
||||
/* clone the reply so it doesn't get destroyed on us */
|
||||
ei_x_buff *nbuf = malloc(sizeof(*nbuf));
|
||||
nbuf->buff = malloc(buf->buffsz);
|
||||
memcpy(nbuf->buff, buf->buff, buf->buffsz);
|
||||
nbuf->index = buf->index;
|
||||
nbuf->buffsz = buf->buffsz;
|
||||
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got reply for %s\n", uuid_str);
|
||||
|
||||
|
||||
/* copy info into the reply struct */
|
||||
p->state = reply_found;
|
||||
p->reply = nbuf;
|
||||
strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
|
||||
|
||||
|
||||
/* signal waiting thread that its time to wake up */
|
||||
switch_thread_cond_signal(p->ready_or_found);
|
||||
|
||||
/* reply OK */
|
||||
ei_x_encode_tuple_header(rbuf, 2);
|
||||
ei_x_encode_atom(rbuf, "ok");
|
||||
_ei_x_encode_string(rbuf, uuid_str);
|
||||
|
||||
/* unlock */
|
||||
switch_mutex_unlock(p->mutex);
|
||||
} else {
|
||||
if (p->state == reply_found) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str);
|
||||
ei_x_encode_tuple_header(rbuf, 3);
|
||||
ei_x_encode_atom(rbuf, "error");
|
||||
_ei_x_encode_string(rbuf, uuid_str);
|
||||
ei_x_encode_atom(rbuf, "duplicate_response");
|
||||
} else if (p->state == reply_timeout) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str);
|
||||
ei_x_encode_tuple_header(rbuf, 3);
|
||||
ei_x_encode_atom(rbuf, "error");
|
||||
_ei_x_encode_string(rbuf, uuid_str);
|
||||
ei_x_encode_atom(rbuf, "timeout");
|
||||
} else if (p->state == reply_not_ready) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str);
|
||||
ei_x_encode_tuple_header(rbuf, 3);
|
||||
ei_x_encode_atom(rbuf, "error");
|
||||
_ei_x_encode_string(rbuf, uuid_str);
|
||||
ei_x_encode_atom(rbuf, "not_ready");
|
||||
}
|
||||
switch_mutex_unlock(p->mutex);
|
||||
}
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not lock mutex for reply %s\n", uuid_str);
|
||||
break;
|
||||
};
|
||||
case reply_found:
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str);
|
||||
ei_x_encode_tuple_header(rbuf, 3);
|
||||
ei_x_encode_atom(rbuf, "error");
|
||||
_ei_x_encode_string(rbuf, uuid_str);
|
||||
ei_x_encode_atom(rbuf, "duplicate_response");
|
||||
break;
|
||||
case reply_timeout:
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str);
|
||||
ei_x_encode_tuple_header(rbuf, 3);
|
||||
ei_x_encode_atom(rbuf, "error");
|
||||
_ei_x_encode_string(rbuf, uuid_str);
|
||||
ei_x_encode_atom(rbuf, "timeout");
|
||||
break;
|
||||
case reply_not_ready:
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str);
|
||||
ei_x_encode_tuple_header(rbuf, 3);
|
||||
ei_x_encode_atom(rbuf, "error");
|
||||
_ei_x_encode_string(rbuf, uuid_str);
|
||||
ei_x_encode_atom(rbuf, "not_ready");
|
||||
break;
|
||||
}
|
||||
|
||||
switch_mutex_unlock(p->mutex);
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not find request for reply %s\n", uuid_str);
|
||||
ei_x_encode_tuple_header(rbuf, 2);
|
||||
|
|
|
@ -377,6 +377,58 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
|
|||
return session;
|
||||
}
|
||||
|
||||
static fetch_reply_t *new_fetch_reply(const char *uuid_str)
|
||||
{
|
||||
fetch_reply_t *reply = NULL;
|
||||
switch_memory_pool_t *pool = NULL;
|
||||
|
||||
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
|
||||
abort();
|
||||
}
|
||||
switch_assert(pool != NULL);
|
||||
reply = switch_core_alloc(pool, sizeof(*reply));
|
||||
switch_assert(reply != NULL);
|
||||
memset(reply, 0, sizeof(*reply));
|
||||
|
||||
reply->uuid_str = switch_core_strdup(pool, uuid_str);
|
||||
reply->pool = pool;
|
||||
switch_thread_cond_create(&reply->ready_or_found, pool);
|
||||
switch_mutex_init(&reply->mutex, SWITCH_MUTEX_UNNESTED, pool);
|
||||
reply->state = reply_not_ready;
|
||||
reply->reply = NULL;
|
||||
switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, reply, globals.fetch_reply_mutex);
|
||||
reply->state = reply_waiting;
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
static void destroy_fetch_reply(fetch_reply_t *reply)
|
||||
{
|
||||
switch_core_hash_delete_locked(globals.fetch_reply_hash, reply->uuid_str, globals.fetch_reply_mutex);
|
||||
/* lock so nothing can have it while we delete it */
|
||||
switch_mutex_lock(reply->mutex);
|
||||
switch_mutex_unlock(reply->mutex);
|
||||
|
||||
switch_mutex_destroy(reply->mutex);
|
||||
switch_thread_cond_destroy(reply->ready_or_found);
|
||||
switch_safe_free(reply->reply);
|
||||
switch_core_destroy_memory_pool(&(reply->pool));
|
||||
}
|
||||
|
||||
fetch_reply_t *find_fetch_reply(const char *uuid)
|
||||
{
|
||||
fetch_reply_t *reply = NULL;
|
||||
|
||||
switch_mutex_lock(globals.fetch_reply_mutex);
|
||||
if ((reply = switch_core_hash_find(globals.fetch_reply_hash, uuid))) {
|
||||
if (switch_mutex_trylock(reply->mutex) != SWITCH_STATUS_SUCCESS) {
|
||||
reply = NULL;
|
||||
}
|
||||
}
|
||||
switch_mutex_unlock(globals.fetch_reply_mutex);
|
||||
return reply;
|
||||
}
|
||||
|
||||
static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value,
|
||||
switch_event_t *params, void *user_data)
|
||||
|
@ -437,15 +489,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
|
|||
}
|
||||
|
||||
if (!p) {
|
||||
/* Create a new fetch object. */
|
||||
p = malloc(sizeof(*p));
|
||||
switch_thread_cond_create(&p->ready_or_found, module_pool);
|
||||
/* TODO module pool */
|
||||
switch_mutex_init(&p->mutex, SWITCH_MUTEX_UNNESTED, module_pool);
|
||||
p->state = reply_not_ready;
|
||||
p->reply = NULL;
|
||||
switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex);
|
||||
p->state = reply_waiting;
|
||||
p = new_fetch_reply(uuid_str);
|
||||
now = switch_micro_time_now();
|
||||
}
|
||||
/* We don't need to lock here because everybody is waiting
|
||||
|
@ -517,14 +561,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
|
|||
/* cleanup */
|
||||
cleanup:
|
||||
if (p) {
|
||||
/* lock so nothing can have it while we delete it */
|
||||
switch_mutex_lock(p->mutex);
|
||||
switch_core_hash_delete_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex);
|
||||
switch_mutex_unlock(p->mutex);
|
||||
switch_mutex_destroy(p->mutex);
|
||||
switch_thread_cond_destroy(p->ready_or_found);
|
||||
switch_safe_free(p->reply);
|
||||
switch_safe_free(p);
|
||||
destroy_fetch_reply(p);
|
||||
}
|
||||
|
||||
return xml;
|
||||
|
|
|
@ -59,6 +59,8 @@ enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout };
|
|||
|
||||
struct fetch_reply_struct
|
||||
{
|
||||
const char *uuid_str;
|
||||
switch_memory_pool_t *pool;
|
||||
switch_thread_cond_t *ready_or_found;
|
||||
switch_mutex_t *mutex;
|
||||
enum reply_state state;
|
||||
|
@ -267,6 +269,8 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
|
|||
session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid);
|
||||
void put_reply_unlock(fetch_reply_t *p, char *uuid_str);
|
||||
|
||||
fetch_reply_t *find_fetch_reply(const char *uuid);
|
||||
|
||||
/* For Emacs:
|
||||
* Local Variables:
|
||||
* mode:c
|
||||
|
|
Loading…
Reference in New Issue