diff --git a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c index 8ae06ea38d..1cdb5e90bc 100644 --- a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c +++ b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c @@ -47,7 +47,8 @@ typedef enum { LFLAG_FULL = (1 << 4), LFLAG_MYEVENTS = (1 << 5), LFLAG_SESSION = (1 << 6), - LFLAG_ASYNC = (1 << 7) + LFLAG_ASYNC = (1 << 7), + LFLAG_STATEFUL = (1 << 8) } event_flag_t; typedef enum { @@ -72,6 +73,9 @@ struct listener { int lost_events; int lost_logs; int hup; + time_t last_flush; + uint32_t timeout; + uint32_t id; switch_sockaddr_t *sa; char remote_ip[50]; switch_port_t remote_port; @@ -99,8 +103,21 @@ static struct { int threads; char *acl[MAX_ACL]; uint32_t acl_count; + uint32_t id; } prefs; + +static void remove_listener(listener_t *listener); + +static uint32_t next_id(void) +{ + uint32_t id; + switch_mutex_lock(listen_list.mutex); + id = ++prefs.id; + switch_mutex_unlock(listen_list.mutex); + return id; +} + SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password); @@ -165,6 +182,14 @@ static void event_handler(switch_event_t *event) continue; } + if (switch_test_flag(l, LFLAG_STATEFUL) && l->timeout && switch_timestamp(NULL) - l->last_flush > l->timeout) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Stateful Listener %u has expired\n", l->id); + remove_listener(l); + switch_thread_rwlock_unlock(l->rwlock); + switch_core_hash_destroy(&l->event_hash); + switch_core_destroy_memory_pool(&l->pool); + } + if (l->event_list[SWITCH_EVENT_ALL]) { send = 1; } else if ((l->event_list[event->event_id])) { @@ -347,18 +372,6 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_socket_shutdown) return SWITCH_STATUS_SUCCESS; } -SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) -{ - switch_application_interface_t *app_interface; - - /* connect my internal structure to the blank pointer passed to me */ - *module_interface = switch_loadable_module_create_module_interface(pool, modname); - SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "[:]", SAF_SUPPORT_NOMEDIA); - - /* indicate that the module should continue to be loaded */ - return SWITCH_STATUS_SUCCESS; -} - static void add_listener(listener_t *listener) { /* add me to the listeners so I get events */ @@ -386,6 +399,24 @@ static void remove_listener(listener_t *listener) switch_mutex_unlock(listen_list.mutex); } + +static listener_t *find_listener(uint32_t id) +{ + listener_t *l, *r = NULL; + + switch_mutex_lock(listen_list.mutex); + for (l = listen_list.listeners; l; l = l->next) { + if (l->id && l->id == id) { + if (switch_thread_rwlock_tryrdlock(l->rwlock) == SWITCH_STATUS_SUCCESS) { + r = l; + } + break; + } + } + switch_mutex_unlock(listen_list.mutex); + return r; +} + static void strip_cr(char *s) { char *p; @@ -394,6 +425,242 @@ static void strip_cr(char *s) } } + +static void xmlize_listener(listener_t *listener, switch_stream_handle_t *stream) +{ + stream->write_function(stream, " \n"); + stream->write_function(stream, " %u\n", listener->id); + stream->write_function(stream, " %s\n", listener->format == EVENT_FORMAT_XML ? "xml" : "plain"); + stream->write_function(stream, " %u\n", listener->timeout); + stream->write_function(stream, " \n"); +} + +SWITCH_STANDARD_API(event_manager_function) +{ + char *http = NULL; + char *wcmd = NULL; + char *format = NULL; + listener_t *listener = NULL; + + if (stream->param_event) { + http = switch_event_get_header(stream->param_event, "http-host"); + wcmd = switch_event_get_header(stream->param_event, "command"); + format = switch_event_get_header(stream->param_event, "format"); + } + + if (!http) { + stream->write_function(stream, "This is a web application.!\n"); + return SWITCH_STATUS_SUCCESS; + } + stream->write_function(stream, "Content-Type: text/xml\n\n"); + + stream->write_function(stream, "\n"); + stream->write_function(stream, "\n"); + + if (!wcmd) { + stream->write_function(stream, "Missing command parameter!\n"); + goto end; + } + + if (!format) { + format = "xml"; + } + + + + if (!strcasecmp(wcmd, "create-listener")) { + char *events = switch_event_get_header(stream->param_event, "events"); + switch_memory_pool_t *pool; + char *next, *cur; + uint32_t count = 0, key_count = 0; + uint8_t custom = 0; + char *edup; + + if (switch_strlen_zero(events)) { + stream->write_function(stream, "Missing parameter!\n"); + goto end; + } + + switch_core_new_memory_pool(&pool); + listener = switch_core_alloc(pool, sizeof(*listener)); + listener->pool = pool; + listener->format = EVENT_FORMAT_PLAIN; + switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); + switch_core_hash_init(&listener->event_hash, listener->pool); + switch_set_flag(listener, LFLAG_AUTHED); + switch_set_flag(listener, LFLAG_STATEFUL); + switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener->pool); + switch_thread_rwlock_create(&listener->rwlock, listener->pool); + listener->id = next_id(); + listener->timeout = 60; + listener->last_flush = switch_timestamp(NULL); + + if (switch_stristr("xml", format)) { + listener->format = EVENT_FORMAT_XML; + } else { + listener->format = EVENT_FORMAT_PLAIN; + } + + edup = strdup(events); + + for (cur = edup; cur; count++) { + switch_event_types_t type; + + if ((next = strchr(cur, ' '))) { + *next++ = '\0'; + } + + if (custom) { + switch_core_hash_insert(listener->event_hash, cur, MARKER); + } else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) { + key_count++; + if (type == SWITCH_EVENT_ALL) { + uint32_t x = 0; + for (x = 0; x < SWITCH_EVENT_ALL; x++) { + listener->event_list[x] = 1; + } + } + if (type <= SWITCH_EVENT_ALL) { + listener->event_list[type] = 1; + } + if (type == SWITCH_EVENT_CUSTOM) { + custom++; + } + } + + cur = next; + } + + + switch_safe_free(edup); + + if (!key_count) { + switch_core_hash_destroy(&listener->event_hash); + switch_core_destroy_memory_pool(&listener->pool); + stream->write_function(stream, "No keywords supplied\n"); + goto end; + } + + + switch_set_flag_locked(listener, LFLAG_EVENTS); + add_listener(listener); + stream->write_function(stream, "\n"); + stream->write_function(stream, " Listener %u Created\n", listener->id); + xmlize_listener(listener, stream); + stream->write_function(stream, "\n"); + + goto end; + } else if (!strcasecmp(wcmd, "destroy-listener")) { + char *id = switch_event_get_header(stream->param_event, "listen-id"); + uint32_t idl = (uint32_t) atol(id); + + if ((listener = find_listener(idl))) { + remove_listener(listener); + stream->write_function(stream, "\n listener %u destroyed\n", listener->id); + xmlize_listener(listener, stream); + stream->write_function(stream, "\n"); + switch_thread_rwlock_unlock(listener->rwlock); + switch_core_hash_destroy(&listener->event_hash); + switch_core_destroy_memory_pool(&listener->pool); + goto end; + } else { + stream->write_function(stream, "Can't find listener\n"); + goto end; + } + + } else if (!strcasecmp(wcmd, "check-listener")) { + char *id = switch_event_get_header(stream->param_event, "listen-id"); + uint32_t idl = (uint32_t) atol(id); + void *pop; + switch_event_t *pevent; + + if (!(listener = find_listener(idl))) { + stream->write_function(stream, "Can't find listener\n"); + goto end; + } + + listener->last_flush = switch_timestamp(NULL); + stream->write_function(stream, "\n Current Events Follow\n"); + xmlize_listener(listener, stream); + stream->write_function(stream, "\n"); + + while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + pevent = (switch_event_t *) pop; + char *etype; + + if (listener->format == EVENT_FORMAT_PLAIN) { + etype = "plain"; + switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE); + stream->write_function(stream, "\n%s", listener->ebuf); + } else { + switch_xml_t xml; + etype = "xml"; + + if ((xml = switch_event_xmlize(pevent, "%s", ""))) { + listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE); + switch_xml_free(xml); + } else { + stream->write_function(stream, "-ERR XML Error\n"); + break; + } + + stream->write_function(stream, "%s\n", listener->ebuf); + } + + switch_safe_free(listener->ebuf); + switch_event_destroy(&pevent); + } + + stream->write_function(stream, " \n\n"); + + if (pevent) { + switch_event_destroy(&pevent); + } + + switch_thread_rwlock_unlock(listener->rwlock); + } else if (!strcasecmp(wcmd, "exec-fsapi")) { + char *api_command = switch_event_get_header(stream->param_event, "fsapi-command"); + char *api_args = switch_event_get_header(stream->param_event, "fsapi-args"); + switch_event_t *event, *oevent; + + if (!(api_command)) { + stream->write_function(stream, "INVALID API COMMAND!\n"); + goto end; + } + + stream->write_function(stream, "\n Execute API Command\n\n"); + switch_event_create(&event, SWITCH_EVENT_REQUEST_PARAMS); + oevent = stream->param_event; + stream->param_event = event; + switch_api_execute(api_command, api_args, NULL, stream); + stream->param_event = oevent; + stream->write_function(stream, " \n"); + } else { + stream->write_function(stream, "INVALID COMMAND!write_function(stream, "\n\n"); + + return SWITCH_STATUS_SUCCESS; +} + + +SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) +{ + switch_application_interface_t *app_interface; + switch_api_interface_t *api_interface; + + /* connect my internal structure to the blank pointer passed to me */ + *module_interface = switch_loadable_module_create_module_interface(pool, modname); + SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "[:]", SAF_SUPPORT_NOMEDIA); + SWITCH_ADD_API(api_interface, "event_manager", "event_manager", event_manager_function, ""); + + /* indicate that the module should continue to be loaded */ + return SWITCH_STATUS_SUCCESS; +} + static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout) { switch_size_t mlen, bytes = 0;