From 4e6b56c53d83c643b2d590da100c70c02dc3f993 Mon Sep 17 00:00:00 2001 From: Tamas Cseke Date: Thu, 24 May 2012 10:57:28 +0200 Subject: [PATCH] add listener event r/w locking FS-3432 --- .../mod_erlang_event/handle_msg.c | 24 +++++++++++++------ .../mod_erlang_event/mod_erlang_event.c | 17 ++++++++----- .../mod_erlang_event/mod_erlang_event.h | 1 + 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 0b083cd0c0..8e6ba46a62 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -28,6 +28,7 @@ * Rob Charlton * Darren Schreiber * Mike Jerris + * Tamas Cseke * * * handle_msg.c -- handle messages received from erlang nodes @@ -286,7 +287,8 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu switch_set_flag_locked(listener, LFLAG_EVENTS); } - /* TODO - listener write lock */ + switch_thread_rwlock_wrlock(listener->event_rwlock); + for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { @@ -312,6 +314,8 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom); } } + switch_thread_rwlock_unlock(listener->event_rwlock); + ei_x_encode_atom(rbuf, "ok"); } return SWITCH_STATUS_SUCCESS; @@ -382,7 +386,8 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x int i = 0; switch_event_types_t type; - /* TODO listener write lock */ + switch_thread_rwlock_wrlock(listener->event_rwlock); + for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { @@ -410,6 +415,8 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x } } } + + switch_thread_rwlock_unlock(listener->event_rwlock); ei_x_encode_atom(rbuf, "ok"); } return SWITCH_STATUS_SUCCESS; @@ -522,11 +529,11 @@ static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg } } /* update the event subscriptions with the new ones */ + switch_thread_rwlock_wrlock(listener->event_rwlock); memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1)); - /* wipe the old hash, and point the pointer at the new one */ - /* TODO make thread safe */ switch_core_hash_destroy(&listener->event_hash); listener->event_hash = event_hash; + switch_thread_rwlock_unlock(listener->event_rwlock); /* TODO - we should flush any non-matching events from the queue */ ei_x_encode_atom(rbuf, "ok"); @@ -1031,13 +1038,16 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e if (switch_test_flag(listener, LFLAG_EVENTS)) { uint8_t x = 0; switch_clear_flag_locked(listener, LFLAG_EVENTS); + + switch_thread_rwlock_wrlock(listener->event_rwlock); for (x = 0; x <= SWITCH_EVENT_ALL; x++) { listener->event_list[x] = 0; } - /* wipe the hash */ - /* TODO make thread safe*/ - switch_core_hash_destroy(&listener->event_hash); + + switch_core_hash_delete_multi(listener->event_hash, NULL, NULL); switch_core_hash_init(&listener->event_hash, listener->pool); + + switch_thread_rwlock_unlock(listener->event_rwlock); ei_x_encode_atom(rbuf, "ok"); } else { ei_x_encode_tuple_header(rbuf, 2); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index c2f204ef0a..f690fe6453 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -26,6 +26,7 @@ * Anthony Minessale II * Andrew Thompson * Rob Charlton + * Tamas Cseke * * * mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket @@ -200,6 +201,8 @@ static void event_handler(switch_event_t *event) continue; } + switch_thread_rwlock_rdlock(l->event_rwlock); + if (l->event_list[SWITCH_EVENT_ALL]) { send = 1; } else if ((l->event_list[event->event_id])) { @@ -208,6 +211,7 @@ static void event_handler(switch_event_t *event) } } + switch_thread_rwlock_unlock(l->event_rwlock); if (send) { if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { @@ -815,14 +819,12 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) uint8_t x = 0; switch_clear_flag_locked(listener, LFLAG_EVENTS); + switch_thread_rwlock_wrlock(listener->event_rwlock); for (x = 0; x <= SWITCH_EVENT_ALL; x++) { listener->event_list[x] = 0; } - /* wipe the hash */ - /* XXX this needs to be locked */ - /* TODO switch_core_hash_delete_multi_locked */ - switch_core_hash_destroy(&listener->event_hash); - switch_core_hash_init(&listener->event_hash, listener->pool); + switch_core_hash_delete_multi(listener->event_hash, NULL, NULL); + switch_thread_rwlock_unlock(listener->event_rwlock); } } } @@ -1177,7 +1179,6 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) } memset(listener, 0, sizeof(*listener)); - switch_thread_rwlock_create(&listener->rwlock, pool); switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool); switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool); @@ -1188,7 +1189,11 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) listener->level = SWITCH_LOG_DEBUG; switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool); + + switch_thread_rwlock_create(&listener->rwlock, pool); + switch_thread_rwlock_create(&listener->event_rwlock, pool); switch_thread_rwlock_create(&listener->session_rwlock, listener->pool); + switch_core_hash_init(&listener->event_hash, listener->pool); switch_core_hash_init(&listener->sessions, listener->pool); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 016734dd80..5383922782 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -129,6 +129,7 @@ struct listener { uint8_t event_list[SWITCH_EVENT_ALL + 1]; switch_hash_t *event_hash; switch_thread_rwlock_t *rwlock; + switch_thread_rwlock_t *event_rwlock; switch_thread_rwlock_t *session_rwlock; //session_elem_t *session_list; switch_hash_t *sessions;