mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-06-12 18:27:06 +00:00
Switching to using a hash instead of a linked list for outbound sessions
This commit is contained in:
parent
63b6144285
commit
64a28ee974
@ -142,15 +142,14 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
|
|||||||
{
|
{
|
||||||
char *uuid = switch_event_get_header(event, "unique-id");
|
char *uuid = switch_event_get_header(event, "unique-id");
|
||||||
switch_event_t *clone = NULL;
|
switch_event_t *clone = NULL;
|
||||||
session_elem_t *s;
|
session_elem_t *s = NULL;
|
||||||
|
|
||||||
if (!uuid)
|
if (!uuid) {
|
||||||
return;
|
return;
|
||||||
switch_mutex_lock(listener->session_mutex);
|
}
|
||||||
for (s = listener->session_list; s; s = s->next) {
|
|
||||||
/* check the event uuid against the uuid of each session */
|
if ((s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
|
||||||
if (!strcmp(uuid, s->uuid_str)) {
|
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event %s to attached session %s\n",
|
||||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event %s to attached session for %s\n",
|
|
||||||
switch_event_name(event->event_id), s->uuid_str);
|
switch_event_name(event->event_id), s->uuid_str);
|
||||||
if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
|
if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
|
||||||
/* add the event to the queue for this session */
|
/* add the event to the queue for this session */
|
||||||
@ -163,8 +162,6 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void event_handler(switch_event_t *event)
|
static void event_handler(switch_event_t *event)
|
||||||
{
|
{
|
||||||
@ -304,33 +301,13 @@ static listener_t *find_listener(char *nodename)
|
|||||||
|
|
||||||
static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element)
|
static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element)
|
||||||
{
|
{
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_core_hash_insert_locked(listener->sessions, session_element->uuid_str, (void*) session_element, listener->session_mutex);
|
||||||
session_element->next = listener->session_list;
|
|
||||||
listener->session_list = session_element;
|
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element)
|
static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element)
|
||||||
{
|
{
|
||||||
session_elem_t *s, *last = NULL;
|
switch_core_hash_delete(listener->sessions, session_element->uuid_str);
|
||||||
|
|
||||||
if (!session_element)
|
|
||||||
return;
|
|
||||||
|
|
||||||
for (s = listener->session_list; s; s = s->next) {
|
|
||||||
if (s == session_element) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_DEBUG, "Removing session element for %s\n",
|
|
||||||
session_element->uuid_str);
|
|
||||||
if (last) {
|
|
||||||
last->next = s->next;
|
|
||||||
} else {
|
|
||||||
listener->session_list = s->next;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
last = s;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroy_session_elem(session_elem_t *session_element)
|
static void destroy_session_elem(session_elem_t *session_element)
|
||||||
@ -357,17 +334,23 @@ static void remove_session_elem_from_listener_locked(listener_t *listener, sessi
|
|||||||
|
|
||||||
session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
|
session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
|
||||||
{
|
{
|
||||||
session_elem_t *s = NULL;
|
switch_hash_index_t *iter = NULL;
|
||||||
|
const void *key = NULL;
|
||||||
|
void *val = NULL;
|
||||||
|
session_elem_t *session = NULL;
|
||||||
|
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_mutex_lock(listener->session_mutex);
|
||||||
for (s = listener->session_list; s; s = s->next) {
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
if (s->process.type == ERLANG_PID && !ei_compare_pids(pid, &s->process.pid)) {
|
switch_hash_this(iter, &key, NULL, &val);
|
||||||
break;
|
session = (session_elem_t*)val;
|
||||||
|
if (session->process.type == ERLANG_PID && !ei_compare_pids(pid, &session->process.pid)) {
|
||||||
|
switch_mutex_unlock(listener->session_mutex);
|
||||||
|
return session;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_mutex_unlock(listener->session_mutex);
|
||||||
|
|
||||||
return s;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -567,21 +550,26 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t *
|
|||||||
|
|
||||||
static switch_status_t check_attached_sessions(listener_t *listener)
|
static switch_status_t check_attached_sessions(listener_t *listener)
|
||||||
{
|
{
|
||||||
session_elem_t *last, *sp, *removed;
|
session_elem_t *sp;
|
||||||
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
||||||
void *pop;
|
void *pop;
|
||||||
|
const void *key;
|
||||||
|
void * value;
|
||||||
|
switch_hash_index_t *iter;
|
||||||
|
switch_event_t *event = NULL;
|
||||||
|
switch_event_header_t *header = NULL;
|
||||||
|
switch_event_create_subclass(&event, SWITCH_EVENT_CLONE, NULL);
|
||||||
|
switch_assert(event);
|
||||||
/* check up on all the attached sessions -
|
/* check up on all the attached sessions -
|
||||||
if they have not yet sent an initial call event to the associated erlang process then do so
|
if they have not yet sent an initial call event to the associated erlang process then do so
|
||||||
if they have pending events in their queues then send them
|
if they have pending events in their queues then send them
|
||||||
if the session has finished then clean it up
|
if the session has finished then clean it up
|
||||||
*/
|
*/
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_mutex_lock(listener->session_mutex);
|
||||||
sp = listener->session_list;
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
last = NULL;
|
switch_hash_this(iter, &key, NULL, &value);
|
||||||
while (sp) {
|
sp = (session_elem_t*)value;
|
||||||
removed = NULL;
|
|
||||||
if (switch_test_flag(sp, LFLAG_WAITING_FOR_PID)) {
|
if (switch_test_flag(sp, LFLAG_WAITING_FOR_PID)) {
|
||||||
sp = sp->next;
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -589,11 +577,7 @@ static switch_status_t check_attached_sessions(listener_t *listener)
|
|||||||
status = notify_new_session(listener, sp);
|
status = notify_new_session(listener, sp);
|
||||||
if (status != SWITCH_STATUS_SUCCESS) {
|
if (status != SWITCH_STATUS_SUCCESS) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(sp->uuid_str), SWITCH_LOG_DEBUG, "Notifying new session failed\n");
|
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(sp->uuid_str), SWITCH_LOG_DEBUG, "Notifying new session failed\n");
|
||||||
removed = sp;
|
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "delete", (const char *) key);
|
||||||
sp = removed->next;
|
|
||||||
|
|
||||||
remove_session_elem_from_listener(listener, removed);
|
|
||||||
destroy_session_elem(removed);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
switch_set_flag(sp, LFLAG_OUTBOUND_INIT);
|
switch_set_flag(sp, LFLAG_OUTBOUND_INIT);
|
||||||
@ -633,11 +617,8 @@ static switch_status_t check_attached_sessions(listener_t *listener)
|
|||||||
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf);
|
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf);
|
||||||
switch_mutex_unlock(listener->sock_mutex);
|
switch_mutex_unlock(listener->sock_mutex);
|
||||||
ei_x_free(&ebuf);
|
ei_x_free(&ebuf);
|
||||||
removed = sp;
|
|
||||||
sp = removed->next;
|
|
||||||
|
|
||||||
remove_session_elem_from_listener(listener, removed);
|
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "delete", (const char *) key);
|
||||||
destroy_session_elem(removed);
|
|
||||||
continue;
|
continue;
|
||||||
} else if (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
|
} else if (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
|
||||||
@ -663,8 +644,15 @@ static switch_status_t check_attached_sessions(listener_t *listener)
|
|||||||
|
|
||||||
switch_event_destroy(&pevent);
|
switch_event_destroy(&pevent);
|
||||||
}
|
}
|
||||||
sp = sp->next;
|
|
||||||
}
|
}
|
||||||
|
/* do the deferred remove */
|
||||||
|
for (header = event->headers; header; header = header->next) {
|
||||||
|
if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) {
|
||||||
|
remove_session_elem_from_listener(listener, sp);
|
||||||
|
destroy_session_elem(sp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_mutex_unlock(listener->session_mutex);
|
||||||
if (prefs.done) {
|
if (prefs.done) {
|
||||||
return SWITCH_STATUS_FALSE; /* we're shutting down */
|
return SWITCH_STATUS_FALSE; /* we're shutting down */
|
||||||
@ -953,7 +941,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
|
|||||||
{
|
{
|
||||||
listener_t *listener = (listener_t *) obj;
|
listener_t *listener = (listener_t *) obj;
|
||||||
session_elem_t *s;
|
session_elem_t *s;
|
||||||
switch_core_session_t *session;
|
const void *key;
|
||||||
|
void *value;
|
||||||
|
switch_hash_index_t *iter;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_mutex_lock(globals.listener_mutex);
|
||||||
prefs.threads++;
|
prefs.threads++;
|
||||||
@ -992,15 +982,12 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
|
|||||||
remove_binding(listener, NULL);
|
remove_binding(listener, NULL);
|
||||||
|
|
||||||
/* clean up all the attached sessions */
|
/* clean up all the attached sessions */
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_mutex_lock(listener->session_mutex); // TODO is iterating thread safe?
|
||||||
/* TODO destroy memory pools since they're not children of the listener's pool */
|
/* TODO destroy memory pools since they're not children of the listener's pool */
|
||||||
for (s = listener->session_list; s; s = s->next) {
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
if ((session = switch_core_session_locate(s->uuid_str))) {
|
switch_hash_this(iter, &key, NULL, &value);
|
||||||
switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED);
|
s = (session_elem_t*)value;
|
||||||
switch_core_session_rwunlock(session);
|
destroy_session_elem(s);
|
||||||
}
|
|
||||||
/* this allows the application threads to exit */
|
|
||||||
switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE);
|
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_mutex_unlock(listener->session_mutex);
|
||||||
|
|
||||||
@ -1135,6 +1122,7 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
|
|||||||
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
||||||
switch_mutex_init(&listener->session_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
switch_mutex_init(&listener->session_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
||||||
switch_core_hash_init(&listener->event_hash, listener->pool);
|
switch_core_hash_init(&listener->event_hash, listener->pool);
|
||||||
|
switch_core_hash_init(&listener->sessions, listener->pool);
|
||||||
|
|
||||||
return listener;
|
return listener;
|
||||||
}
|
}
|
||||||
@ -1189,6 +1177,7 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t
|
|||||||
switch_memory_pool_t *session_elem_pool;
|
switch_memory_pool_t *session_elem_pool;
|
||||||
session_elem_t *session_element; /* = malloc(sizeof(*session_element)); */
|
session_elem_t *session_element; /* = malloc(sizeof(*session_element)); */
|
||||||
switch_channel_t *channel = switch_core_session_get_channel(session);
|
switch_channel_t *channel = switch_core_session_get_channel(session);
|
||||||
|
int x;
|
||||||
|
|
||||||
if (switch_core_new_memory_pool(&session_elem_pool) != SWITCH_STATUS_SUCCESS) {
|
if (switch_core_new_memory_pool(&session_elem_pool) != SWITCH_STATUS_SUCCESS) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
|
||||||
@ -1205,6 +1194,13 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t
|
|||||||
|
|
||||||
switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, session_element->pool);
|
switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, session_element->pool);
|
||||||
switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, session_element->pool);
|
switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, session_element->pool);
|
||||||
|
switch_core_hash_init(&session_element->event_hash, session_element->pool);
|
||||||
|
|
||||||
|
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
|
||||||
|
session_element->event_list[x] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
session_element->event_list[SWITCH_EVENT_ALL] = 1; /* defaults to everything */
|
||||||
|
|
||||||
switch_channel_set_private(channel, "_erlang_session_", session_element);
|
switch_channel_set_private(channel, "_erlang_session_", session_element);
|
||||||
switch_channel_set_private(channel, "_erlang_listener_", listener);
|
switch_channel_set_private(channel, "_erlang_listener_", listener);
|
||||||
@ -1336,16 +1332,12 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
|
|||||||
|
|
||||||
int count_listener_sessions(listener_t *listener)
|
int count_listener_sessions(listener_t *listener)
|
||||||
{
|
{
|
||||||
session_elem_t *last, *sp;
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
switch_hash_index_t *iter;
|
||||||
|
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_mutex_lock(listener->session_mutex);
|
||||||
sp = listener->session_list;
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
last = NULL;
|
|
||||||
while (sp) {
|
|
||||||
count++;
|
count++;
|
||||||
last = sp;
|
|
||||||
sp = sp->next;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_mutex_unlock(listener->session_mutex);
|
||||||
@ -1547,16 +1539,22 @@ SWITCH_STANDARD_API(erlang_cmd)
|
|||||||
for (l = listen_list.listeners; l; l = l->next) {
|
for (l = listen_list.listeners; l; l = l->next) {
|
||||||
if (!strcasecmp(l->peer_nodename, argv[1])) {
|
if (!strcasecmp(l->peer_nodename, argv[1])) {
|
||||||
session_elem_t *sp;
|
session_elem_t *sp;
|
||||||
|
switch_hash_index_t *iter;
|
||||||
|
int empty = 1;
|
||||||
|
const void *key;
|
||||||
|
void *value;
|
||||||
|
|
||||||
found = 1;
|
found = 1;
|
||||||
switch_mutex_lock(l->session_mutex);
|
switch_mutex_lock(l->session_mutex);
|
||||||
if ((sp = l->session_list)) {
|
for (iter = switch_hash_first(NULL, l->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
while (sp) {
|
empty = 0;
|
||||||
|
switch_hash_this(iter, &key, NULL, &value);
|
||||||
|
sp = (session_elem_t*)value;
|
||||||
stream->write_function(stream, "Outbound session for %s in state %s\n", sp->uuid_str,
|
stream->write_function(stream, "Outbound session for %s in state %s\n", sp->uuid_str,
|
||||||
switch_channel_state_name(sp->channel_state));
|
switch_channel_state_name(sp->channel_state));
|
||||||
sp = sp->next;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
|
if (empty) {
|
||||||
stream->write_function(stream, "No active sessions for %s\n", argv[1]);
|
stream->write_function(stream, "No active sessions for %s\n", argv[1]);
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(l->session_mutex);
|
switch_mutex_unlock(l->session_mutex);
|
||||||
|
@ -63,7 +63,9 @@ struct session_elem {
|
|||||||
switch_queue_t *event_queue;
|
switch_queue_t *event_queue;
|
||||||
switch_channel_state_t channel_state;
|
switch_channel_state_t channel_state;
|
||||||
switch_memory_pool_t *pool;
|
switch_memory_pool_t *pool;
|
||||||
struct session_elem *next;
|
uint8_t event_list[SWITCH_EVENT_ALL + 1];
|
||||||
|
switch_hash_t *event_hash;
|
||||||
|
//struct session_elem *next;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct session_elem session_elem_t;
|
typedef struct session_elem session_elem_t;
|
||||||
@ -106,7 +108,8 @@ struct listener {
|
|||||||
switch_hash_t *spawn_pid_hash;
|
switch_hash_t *spawn_pid_hash;
|
||||||
switch_thread_rwlock_t *rwlock;
|
switch_thread_rwlock_t *rwlock;
|
||||||
switch_mutex_t *session_mutex;
|
switch_mutex_t *session_mutex;
|
||||||
session_elem_t *session_list;
|
//session_elem_t *session_list;
|
||||||
|
switch_hash_t *sessions;
|
||||||
int lost_events;
|
int lost_events;
|
||||||
int lost_logs;
|
int lost_logs;
|
||||||
time_t last_flush;
|
time_t last_flush;
|
||||||
@ -246,6 +249,7 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec);
|
|||||||
session_elem_t *attach_call_to_registered_process(listener_t *listener, char *reg_name, switch_core_session_t *session);
|
session_elem_t *attach_call_to_registered_process(listener_t *listener, char *reg_name, switch_core_session_t *session);
|
||||||
session_elem_t *attach_call_to_pid(listener_t *listener, erlang_pid * pid, switch_core_session_t *session);
|
session_elem_t *attach_call_to_pid(listener_t *listener, erlang_pid * pid, switch_core_session_t *session);
|
||||||
session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *module, char *function, switch_core_session_t *session);
|
session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *module, char *function, switch_core_session_t *session);
|
||||||
|
session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid);
|
||||||
void put_reply_unlock(fetch_reply_t *p, char *uuid_str);
|
void put_reply_unlock(fetch_reply_t *p, char *uuid_str);
|
||||||
|
|
||||||
/* For Emacs:
|
/* For Emacs:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user