From e398ede28bbb9de88210b2a482f03861bb56b15f Mon Sep 17 00:00:00 2001 From: Tamas Cseke Date: Wed, 26 Feb 2014 13:34:50 +0100 Subject: [PATCH] FS-5975 --resolve --- .../mod_erlang_event/handle_msg.c | 66 ++++++++--------- .../mod_erlang_event/mod_erlang_event.c | 71 ++++++++++++++----- .../mod_erlang_event/mod_erlang_event.h | 4 ++ 3 files changed, 87 insertions(+), 54 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index fdd0aaad26..ae6da81cae 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -176,66 +176,58 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * ei_x_encode_atom(rbuf, "error"); ei_x_encode_atom(rbuf, "badarg"); } else { - /* TODO - maybe use a rwlock instead */ - if ((p = switch_core_hash_find_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex))) { - /* try to lock the mutex, so no other responder can */ - if (switch_mutex_trylock(p->mutex) == SWITCH_STATUS_SUCCESS) { - if (p->state == reply_waiting) { - /* alright, we've got the lock and we're the first to reply */ + /* reply mutex is locked */ + if ((p = find_fetch_reply(uuid_str))) { + switch (p->state) { + case reply_waiting: + { /* clone the reply so it doesn't get destroyed on us */ ei_x_buff *nbuf = malloc(sizeof(*nbuf)); nbuf->buff = malloc(buf->buffsz); memcpy(nbuf->buff, buf->buff, buf->buffsz); nbuf->index = buf->index; nbuf->buffsz = buf->buffsz; - + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got reply for %s\n", uuid_str); - + /* copy info into the reply struct */ p->state = reply_found; p->reply = nbuf; strncpy(p->winner, listener->peer_nodename, MAXNODELEN); - + /* signal waiting thread that its time to wake up */ switch_thread_cond_signal(p->ready_or_found); - /* reply OK */ ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "ok"); _ei_x_encode_string(rbuf, uuid_str); - - /* unlock */ - switch_mutex_unlock(p->mutex); - } else { - if (p->state == reply_found) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str); - ei_x_encode_tuple_header(rbuf, 3); - ei_x_encode_atom(rbuf, "error"); - _ei_x_encode_string(rbuf, uuid_str); - ei_x_encode_atom(rbuf, "duplicate_response"); - } else if (p->state == reply_timeout) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str); - ei_x_encode_tuple_header(rbuf, 3); - ei_x_encode_atom(rbuf, "error"); - _ei_x_encode_string(rbuf, uuid_str); - ei_x_encode_atom(rbuf, "timeout"); - } else if (p->state == reply_not_ready) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str); - ei_x_encode_tuple_header(rbuf, 3); - ei_x_encode_atom(rbuf, "error"); - _ei_x_encode_string(rbuf, uuid_str); - ei_x_encode_atom(rbuf, "not_ready"); - } - switch_mutex_unlock(p->mutex); - } - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not lock mutex for reply %s\n", uuid_str); + break; + }; + case reply_found: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str); ei_x_encode_tuple_header(rbuf, 3); ei_x_encode_atom(rbuf, "error"); _ei_x_encode_string(rbuf, uuid_str); ei_x_encode_atom(rbuf, "duplicate_response"); + break; + case reply_timeout: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str); + ei_x_encode_tuple_header(rbuf, 3); + ei_x_encode_atom(rbuf, "error"); + _ei_x_encode_string(rbuf, uuid_str); + ei_x_encode_atom(rbuf, "timeout"); + break; + case reply_not_ready: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str); + ei_x_encode_tuple_header(rbuf, 3); + ei_x_encode_atom(rbuf, "error"); + _ei_x_encode_string(rbuf, uuid_str); + ei_x_encode_atom(rbuf, "not_ready"); + break; } + + switch_mutex_unlock(p->mutex); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not find request for reply %s\n", uuid_str); ei_x_encode_tuple_header(rbuf, 2); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index fd248b4dc6..f7c943c396 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -377,6 +377,58 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid) return session; } +static fetch_reply_t *new_fetch_reply(const char *uuid_str) +{ + fetch_reply_t *reply = NULL; + switch_memory_pool_t *pool = NULL; + + if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); + abort(); + } + switch_assert(pool != NULL); + reply = switch_core_alloc(pool, sizeof(*reply)); + switch_assert(reply != NULL); + memset(reply, 0, sizeof(*reply)); + + reply->uuid_str = switch_core_strdup(pool, uuid_str); + reply->pool = pool; + switch_thread_cond_create(&reply->ready_or_found, pool); + switch_mutex_init(&reply->mutex, SWITCH_MUTEX_UNNESTED, pool); + reply->state = reply_not_ready; + reply->reply = NULL; + switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, reply, globals.fetch_reply_mutex); + reply->state = reply_waiting; + + return reply; +} + +static void destroy_fetch_reply(fetch_reply_t *reply) +{ + switch_core_hash_delete_locked(globals.fetch_reply_hash, reply->uuid_str, globals.fetch_reply_mutex); + /* lock so nothing can have it while we delete it */ + switch_mutex_lock(reply->mutex); + switch_mutex_unlock(reply->mutex); + + switch_mutex_destroy(reply->mutex); + switch_thread_cond_destroy(reply->ready_or_found); + switch_safe_free(reply->reply); + switch_core_destroy_memory_pool(&(reply->pool)); +} + +fetch_reply_t *find_fetch_reply(const char *uuid) +{ + fetch_reply_t *reply = NULL; + + switch_mutex_lock(globals.fetch_reply_mutex); + if ((reply = switch_core_hash_find(globals.fetch_reply_hash, uuid))) { + if (switch_mutex_trylock(reply->mutex) != SWITCH_STATUS_SUCCESS) { + reply = NULL; + } + } + switch_mutex_unlock(globals.fetch_reply_mutex); + return reply; +} static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value, switch_event_t *params, void *user_data) @@ -437,15 +489,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c } if (!p) { - /* Create a new fetch object. */ - p = malloc(sizeof(*p)); - switch_thread_cond_create(&p->ready_or_found, module_pool); - /* TODO module pool */ - switch_mutex_init(&p->mutex, SWITCH_MUTEX_UNNESTED, module_pool); - p->state = reply_not_ready; - p->reply = NULL; - switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex); - p->state = reply_waiting; + p = new_fetch_reply(uuid_str); now = switch_micro_time_now(); } /* We don't need to lock here because everybody is waiting @@ -517,14 +561,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /* cleanup */ cleanup: if (p) { - /* lock so nothing can have it while we delete it */ - switch_mutex_lock(p->mutex); - switch_core_hash_delete_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex); - switch_mutex_unlock(p->mutex); - switch_mutex_destroy(p->mutex); - switch_thread_cond_destroy(p->ready_or_found); - switch_safe_free(p->reply); - switch_safe_free(p); + destroy_fetch_reply(p); } return xml; diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index f4e6b2b28d..70b85e716b 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -59,6 +59,8 @@ enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout }; struct fetch_reply_struct { + const char *uuid_str; + switch_memory_pool_t *pool; switch_thread_cond_t *ready_or_found; switch_mutex_t *mutex; enum reply_state state; @@ -267,6 +269,8 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid); void put_reply_unlock(fetch_reply_t *p, char *uuid_str); +fetch_reply_t *find_fetch_reply(const char *uuid); + /* For Emacs: * Local Variables: * mode:c