diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 0ebad27e33..99bde11fbe 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -605,20 +605,23 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e } else if (!strncmp(atom, "nolog", MAXATOMLEN)) { if (switch_test_flag(listener, LFLAG_LOG)) { + void *pop; + /*purge the log queue */ + while (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS); switch_clear_flag_locked(listener, LFLAG_LOG); } ei_x_encode_atom(rbuf, "ok"); } else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) { ei_link(listener, ei_self(listener->ec), &msg->from); listener->log_process.type = ERLANG_PID; - listener->log_process.pid = msg->from; + memcpy(&listener->log_process.pid, &msg->from, sizeof(erlang_pid)); listener->level = SWITCH_LOG_DEBUG; switch_set_flag(listener, LFLAG_LOG); ei_x_encode_atom(rbuf, "ok"); } else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) { ei_link(listener, ei_self(listener->ec), &msg->from); listener->event_process.type = ERLANG_PID; - listener->event_process.pid = msg->from; + memcpy(&listener->event_process.pid, &msg->from, sizeof(erlang_pid)); if (!switch_test_flag(listener, LFLAG_EVENTS)) { switch_set_flag_locked(listener, LFLAG_EVENTS); } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 81523a4380..a5223d4b88 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -190,12 +190,13 @@ static void event_handler(switch_event_t *event) /* test all of the sessions attached to this event in case one of them should receive it as well */ + send_event_to_attached_sessions(l,event); if (!switch_test_flag(l, LFLAG_EVENTS)) { continue; } - + if (switch_test_flag(l, LFLAG_STATEFUL) && l->timeout && switch_epoch_time_now(NULL) - l->last_flush > l->timeout) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Stateful Listener %u has expired\n", l->id); remove_listener(l); @@ -210,6 +211,7 @@ static void event_handler(switch_event_t *event) send = 1; } } + if (send) { if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { @@ -397,7 +399,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /*int index = 3;*/ while (!(rep = (ei_x_buff *) switch_core_hash_find(ptr->listener->fetch_reply_hash, uuid_str))) { if (i > 50) { /* half a second timeout */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "timed out!\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for XML fetch response!\n"); return NULL; } i++; @@ -603,10 +605,11 @@ static void check_log_queue(listener_t *listener) static void check_event_queue(listener_t *listener) { void* pop; + /* send out any pending crap in the event queue */ if (switch_test_flag(listener, LFLAG_EVENTS)) { if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { - + switch_event_t *pevent = (switch_event_t *) pop; ei_x_buff ebuf; @@ -615,7 +618,7 @@ static void check_event_queue(listener_t *listener) ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - ei_sendto(listener->ec, listener->sockfd, &listener->log_process, &ebuf); + ei_sendto(listener->ec, listener->sockfd, &listener->event_process, &ebuf); switch_mutex_unlock(listener->sock_mutex); ei_x_free(&ebuf); @@ -624,6 +627,50 @@ static void check_event_queue(listener_t *listener) } } +static void handle_exit(listener_t *listener, erlang_pid *pid) +{ + session_elem_t *s; + + remove_binding(NULL, pid); /* TODO - why don't we pass the listener as the first argument? */ + if ((s = find_session_elem_by_pid(listener, pid))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Outbound session for %s exited unexpectedly!\n", + switch_core_session_get_uuid(s->session)); + /* TODO - if a spawned process that was handling an outbound call fails.. what do we do with the call? */ + remove_session_elem_from_listener(listener, s); + } + + if (listener->log_process.type == ERLANG_PID && !ei_compare_pids(&listener->log_process.pid, pid)) { + void *pop; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Log handler process for node %s exited\n", pid->node); + /*purge the log queue */ + while (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS); + + if (switch_test_flag(listener, LFLAG_LOG)) { + switch_clear_flag_locked(listener, LFLAG_LOG); + } + } + + if (listener->event_process.type == ERLANG_PID && !ei_compare_pids(&listener->event_process.pid, pid)) { + void *pop; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Event handler process for node %s exited\n", pid->node); + /*purge the event queue */ + while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS); + + if (switch_test_flag(listener, LFLAG_EVENTS)) { + uint8_t x = 0; + switch_clear_flag_locked(listener, LFLAG_EVENTS); + for (x = 0; x <= SWITCH_EVENT_ALL; x++) { + listener->event_list[x] = 0; + } + /* wipe the hash */ + switch_core_hash_destroy(&listener->event_hash); + switch_core_hash_init(&listener->event_hash, listener->pool); + } + } +} + static void listener_main_loop(listener_t *listener) { int status = 1; @@ -681,10 +728,7 @@ static void listener_main_loop(listener_t *listener) break; case ERL_EXIT : switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_exit from %s <%d.%d.%d>\n", msg.from.node, msg.from.creation, msg.from.num, msg.from.serial); - remove_binding(NULL, &msg.from); - /* TODO - if a spawned process that was handling an outbound call fails.. what do we do with the call? */ - remove_session_elem_from_listener(listener, find_session_elem_by_pid(listener, &msg.from)); - /* TODO - check if this linked pid is any of the log/event handler processes and cleanup if it is. */ + handle_exit(listener, &msg.from); break; default : switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "unexpected msg type %d\n", (int)(msg.msgtype)); @@ -1030,7 +1074,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul while (!(pid = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) { if (i > 50) { /* half a second timeout */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "timed out!\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); switch_core_session_rwunlock(session); remove_session_elem_from_listener(listener,session_element); return NULL;