diff --git a/src/mod/endpoints/mod_skinny/mod_skinny.c b/src/mod/endpoints/mod_skinny/mod_skinny.c index cea04acad9..8712ce68b7 100644 --- a/src/mod/endpoints/mod_skinny/mod_skinny.c +++ b/src/mod/endpoints/mod_skinny/mod_skinny.c @@ -59,10 +59,10 @@ typedef enum { } GFLAGS; static struct { - /* prefs */ + /* prefs */ int debug; char *ip; - int port; + unsigned int port; char *dialplan; char *codec_string; char *codec_order[SWITCH_MAX_CODECS]; @@ -72,11 +72,13 @@ static struct { int codec_rates_last; int keep_alive; char *date_format; - /* data */ - switch_event_node_t *node; + /* data */ + switch_event_node_t *heartbeat_node; unsigned int flags; int calls; switch_mutex_t *mutex; + switch_mutex_t *listener_mutex; + int listener_threads; } globals; struct private_object { @@ -101,6 +103,63 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_global_codec_string, globals.codec_string) SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_global_codec_rates_string, globals.codec_rates_string); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_global_date_format, globals.date_format); +/*****************************************************************************/ +/* SKINNY TYPES */ +/*****************************************************************************/ +#define SKINNY_MESSAGE_FIELD_SIZE 4 /* 4-bytes field */ +#define SKINNY_MESSAGE_HEADERSIZE 12 /* three 4-bytes fields */ +#define SKINNY_MESSAGE_MAXSIZE 1000 + +union skinny_data { + void *raw; +}; + +struct skinny_message { + int length; + int reserved; + int type; + union skinny_data data; +}; +typedef struct skinny_message skinny_message_t; + +/*****************************************************************************/ +/* LISTENERS TYPES */ +/*****************************************************************************/ + +typedef enum { + LFLAG_RUNNING = (1 << 0), +} event_flag_t; + +struct listener { + switch_socket_t *sock; + switch_memory_pool_t *pool; + switch_core_session_t *session; + switch_thread_rwlock_t *rwlock; + switch_sockaddr_t *sa; + char remote_ip[50]; + switch_mutex_t *flag_mutex; + uint32_t flags; + switch_port_t remote_port; + uint32_t id; + time_t expire_time; + struct listener *next; +}; + +typedef struct listener listener_t; + +typedef switch_status_t (*skinny_listener_callback_func_t) (listener_t *listener); + +static struct { + switch_socket_t *sock; + switch_mutex_t *sock_mutex; + listener_t *listeners; + uint8_t ready; +} listen_list; + +/*****************************************************************************/ +/* CHANNEL FUNCTIONS */ +/*****************************************************************************/ + static switch_status_t channel_on_init(switch_core_session_t *session); static switch_status_t channel_on_hangup(switch_core_session_t *session); static switch_status_t channel_on_destroy(switch_core_session_t *session); @@ -259,7 +318,7 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int switch_clear_flag_locked(tech_pvt, TFLAG_IO); switch_clear_flag_locked(tech_pvt, TFLAG_VOICE); switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING); - //switch_thread_cond_signal(tech_pvt->cond); + //switch_thread_cond_sigpnal(tech_pvt->cond); break; case SWITCH_SIG_BREAK: switch_set_flag_locked(tech_pvt, TFLAG_BREAK); @@ -505,6 +564,429 @@ switch_io_routines_t skinny_io_routines = { /*.receive_event */ channel_receive_event }; +/*****************************************************************************/ +/* SKINNY FUNCTIONS */ +/*****************************************************************************/ + +static switch_status_t skinny_read_packet(listener_t *listener, skinny_message_t **req, uint32_t timeout) +{ + skinny_message_t *request; + uint32_t elapsed = 0; + time_t start = 0; + switch_size_t mlen, bytes = 0; + char mbuf[SKINNY_MESSAGE_MAXSIZE] = ""; + char *ptr; + switch_status_t status = SWITCH_STATUS_SUCCESS; + + request = switch_core_alloc(module_pool, SKINNY_MESSAGE_MAXSIZE); + + if (!request) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to allocate memory.\n"); + return SWITCH_STATUS_MEMERR; + } + + if (!running) { + return SWITCH_STATUS_FALSE; + } + + start = switch_epoch_time_now(NULL); + ptr = mbuf; + + while (listener->sock && running) { + uint8_t do_sleep = 1; + if(bytes < SKINNY_MESSAGE_FIELD_SIZE) { + /* We have nothing yet, get length header field */ + mlen = SKINNY_MESSAGE_FIELD_SIZE - bytes; + } else { + /* We now know the message size */ + mlen = request->length + 2*SKINNY_MESSAGE_FIELD_SIZE - bytes; + } + + status = switch_socket_recv(listener->sock, ptr, &mlen); + + if (!running || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Socket break.\n"); + return SWITCH_STATUS_FALSE; + } + + if(mlen) { + bytes += mlen; + + if(bytes >= SKINNY_MESSAGE_FIELD_SIZE) { + do_sleep = 0; + ptr += mlen; + memcpy(request, mbuf, bytes); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, + "Got request: length=%d,reserved=%d,type=%d\n", + request->length,request->reserved,request->type); + if(request->length < SKINNY_MESSAGE_FIELD_SIZE) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "Skinny client sent invalid data. Length should be greated than 4 but got %d.\n", + request->length); + return SWITCH_STATUS_FALSE; + } + if(bytes >= request->length + 2*SKINNY_MESSAGE_FIELD_SIZE) { + /* Message body */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, + "Got complete request: length=%d,reserved=%d,type=%d,data=%d\n", + request->length,request->reserved,request->type,request->data.as_char); + *req = request; + return SWITCH_STATUS_SUCCESS; + } + } + } + if (timeout) { + elapsed = (uint32_t) (switch_epoch_time_now(NULL) - start); + if (elapsed >= timeout) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Socket timed out.\n"); + switch_clear_flag_locked(listener, LFLAG_RUNNING); + return SWITCH_STATUS_FALSE; + } + } + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t skinny_parse_request(listener_t *listener, skinny_message_t *request, skinny_message_t **rep) +{ + skinny_message_t *reply; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Parsing request.\n"); + reply = NULL; + switch(request->type) { + /* TODO */ + default: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "Unknown request type: %d.\n", request->type); + } + *rep = reply; + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t skinny_send_reply(listener_t *listener, skinny_message_t *reply) +{ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Sending reply.\n"); + //TODO switch_socket_send(listener->sock, buf, &len); + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t skinny_free_message(skinny_message_t *message) +{ + if(message) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Freeing message\n"); + /* TODO */ + } + return SWITCH_STATUS_SUCCESS; +} + +/*****************************************************************************/ +/* LISTENER FUNCTIONS */ +/*****************************************************************************/ + +static void add_listener(listener_t *listener) +{ + switch_mutex_lock(globals.listener_mutex); + listener->next = listen_list.listeners; + listen_list.listeners = listener; + switch_mutex_unlock(globals.listener_mutex); +} + +static void remove_listener(listener_t *listener) +{ + listener_t *l, *last = NULL; + + switch_mutex_lock(globals.listener_mutex); + for (l = listen_list.listeners; l; l = l->next) { + if (l == listener) { + if (last) { + last->next = l->next; + } else { + listen_list.listeners = l->next; + } + } + last = l; + } + switch_mutex_unlock(globals.listener_mutex); +} + + +static void walk_listeners(skinny_listener_callback_func_t callback) +{ + listener_t *l; + + switch_mutex_lock(globals.listener_mutex); + for (l = listen_list.listeners; l; l = l->next) { + callback(l); + } + switch_mutex_unlock(globals.listener_mutex); + +} + +static void flush_listener(listener_t *listener, switch_bool_t flush_log, switch_bool_t flush_events) +{ + + /* TODO */ +} + +static switch_status_t expire_listener(listener_t *listener) +{ + if (!listener->expire_time) { + listener->expire_time = switch_epoch_time_now(NULL); + } + + if (switch_thread_rwlock_trywrlock(listener->rwlock) != SWITCH_STATUS_SUCCESS) { + return SWITCH_STATUS_FALSE; + } + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_CRIT, "Stateful Listener %u has expired\n", listener->id); + + flush_listener(listener, SWITCH_TRUE, SWITCH_TRUE); + + switch_thread_rwlock_unlock(listener->rwlock); + switch_core_destroy_memory_pool(&listener->pool); + + return SWITCH_STATUS_SUCCESS; +} + +static void close_socket(switch_socket_t **sock) +{ + switch_mutex_lock(listen_list.sock_mutex); + if (*sock) { + switch_socket_shutdown(*sock, SWITCH_SHUTDOWN_READWRITE); + switch_socket_close(*sock); + *sock = NULL; + } + switch_mutex_unlock(listen_list.sock_mutex); +} + +static switch_status_t kill_listener(listener_t *listener) +{ + switch_clear_flag(listener, LFLAG_RUNNING); + close_socket(&listener->sock); + return SWITCH_STATUS_SUCCESS; +} + +static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) +{ + listener_t *listener = (listener_t *) obj; + switch_status_t status; + switch_core_session_t *session = NULL; + switch_channel_t *channel = NULL; + skinny_message_t *request = NULL; + skinny_message_t *reply = NULL; + + switch_mutex_lock(globals.listener_mutex); + globals.listener_threads++; + switch_mutex_unlock(globals.listener_mutex); + + switch_assert(listener != NULL); + + if ((session = listener->session)) { + if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { + goto done; + } + } + + switch_socket_opt_set(listener->sock, SWITCH_SO_TCP_NODELAY, TRUE); + switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE); + + if (globals.debug > 0) { + if (zstr(listener->remote_ip)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open\n"); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open from %s:%d\n", listener->remote_ip, listener->remote_port); + } + } + + switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE); + switch_set_flag_locked(listener, LFLAG_RUNNING); + add_listener(listener); + + + while (running && switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) { + status = skinny_read_packet(listener, &request, 30); + + if (status != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Socket Error!\n"); + switch_clear_flag_locked(listener, LFLAG_RUNNING); + break; + } + + if (!request) { + continue; + } + + if (skinny_parse_request(listener, request, &reply) != SWITCH_STATUS_SUCCESS) { + switch_clear_flag_locked(listener, LFLAG_RUNNING); + break; + } + + skinny_free_message(request); + + if (reply != NULL) { + skinny_send_reply(listener, reply); + } + skinny_free_message(reply); + } + + done: + + skinny_free_message(request); + skinny_free_message(reply); + + remove_listener(listener); + + if (globals.debug > 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); + } + + switch_thread_rwlock_wrlock(listener->rwlock); + flush_listener(listener, SWITCH_TRUE, SWITCH_TRUE); + + if (listener->session) { + channel = switch_core_session_get_channel(listener->session); + } + + if (listener->sock) { + close_socket(&listener->sock); + } + + switch_thread_rwlock_unlock(listener->rwlock); + + if (globals.debug > 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Closed\n"); + } + + if (listener->session) { + switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); + //TODO switch_clear_flag_locked(listener, LFLAG_SESSION); + switch_core_session_rwunlock(listener->session); + } else if (listener->pool) { + switch_memory_pool_t *pool = listener->pool; + switch_core_destroy_memory_pool(&pool); + } + + switch_mutex_lock(globals.listener_mutex); + globals.listener_threads--; + switch_mutex_unlock(globals.listener_mutex); + + return NULL; +} + +/* Create a thread for the socket and launch it */ +static void launch_listener_thread(listener_t *listener) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + + switch_threadattr_create(&thd_attr, listener->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool); +} + +int skinny_socket_create_and_bind() +{ + switch_status_t rv; + switch_sockaddr_t *sa; + switch_socket_t *inbound_socket = NULL; + listener_t *listener; + switch_memory_pool_t *pool = NULL, *listener_pool = NULL; + uint32_t errs = 0; + + if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); + return SWITCH_STATUS_TERM; + } + + while(running) { + rv = switch_sockaddr_info_get(&sa, globals.ip, SWITCH_INET, globals.port, 0, pool); + if (rv) + goto fail; + rv = switch_socket_create(&listen_list.sock, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, pool); + if (rv) + goto sock_fail; + rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_REUSEADDR, 1); + if (rv) + goto sock_fail; + rv = switch_socket_bind(listen_list.sock, sa); + if (rv) + goto sock_fail; + rv = switch_socket_listen(listen_list.sock, 5); + if (rv) + goto sock_fail; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on %s:%u\n", globals.ip, globals.port); + + break; + sock_fail: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not listen on %s:%u\n", globals.ip, globals.port); + switch_yield(100000); + } + + listen_list.ready = 1; + + while(running) { + + if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); + goto fail; + } + + if ((rv = switch_socket_accept(&inbound_socket, listen_list.sock, listener_pool))) { + if (!running) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n"); + goto end; + } else { + /* I wish we could use strerror_r here but its not defined everywhere =/ */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno)); + if (++errs > 100) { + goto end; + } + } + } else { + errs = 0; + } + + + if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); + break; + } + + switch_thread_rwlock_create(&listener->rwlock, listener_pool); + + listener->sock = inbound_socket; + listener->pool = listener_pool; + listener_pool = NULL; + + switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); + + switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock); + switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa); + listener->remote_port = switch_sockaddr_get_port(listener->sa); + launch_listener_thread(listener); + + } + + end: + + close_socket(&listen_list.sock); + + if (pool) { + switch_core_destroy_memory_pool(&pool); + } + + if (listener_pool) { + switch_core_destroy_memory_pool(&listener_pool); + } + + + fail: + return SWITCH_STATUS_TERM; +} + +/*****************************************************************************/ +/* MODULE FUNCTIONS */ +/*****************************************************************************/ static switch_status_t load_skinny_config(void) { char *cf = "skinny.conf"; @@ -562,10 +1044,8 @@ static switch_status_t load_skinny_config(void) static void event_handler(switch_event_t *event) { - if (event->event_id == SWITCH_EVENT_RELOADXML) { - if (load_skinny_config() != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to reload config file\n"); - } + if (event->event_id == SWITCH_EVENT_HEARTBEAT) { + walk_listeners(expire_listener); } } @@ -578,6 +1058,16 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_skinny_load) load_skinny_config(); + switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); + + memset(&listen_list, 0, sizeof(listen_list)); + switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool); + + if ((switch_event_bind_removable(modname, SWITCH_EVENT_HEARTBEAT, NULL, event_handler, NULL, &globals.heartbeat_node) != SWITCH_STATUS_SUCCESS)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind our heartbeat handler!\n"); + /* Not such severe to prevent loading */ + } + /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); skinny_endpoint_interface = switch_loadable_module_create_interface(*module_interface, SWITCH_ENDPOINT_INTERFACE); @@ -585,11 +1075,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_skinny_load) skinny_endpoint_interface->io_routines = &skinny_io_routines; skinny_endpoint_interface->state_handler = &skinny_state_handlers; - if ((switch_event_bind_removable(modname, SWITCH_EVENT_RELOADXML, NULL, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind our reloadxml handler!\n"); - /* Not such severe to prevent loading */ - } - /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; @@ -597,25 +1082,29 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_skinny_load) SWITCH_MODULE_RUNTIME_FUNCTION(mod_skinny_runtime) { - /* TODO: create listeners */ - return SWITCH_STATUS_TERM; + return skinny_socket_create_and_bind(); } SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_skinny_shutdown) { - switch_event_unbind(&globals.node); + int sanity = 0; - int x = 0; + switch_event_unbind(&globals.heartbeat_node); - running = -1; + running = 0; - while (running) { - if (x++ > 100) { + walk_listeners(kill_listener); + + close_socket(&listen_list.sock); + + while (globals.listener_threads) { + switch_yield(100000); + walk_listeners(kill_listener); + if (++sanity >= 200) { break; } - switch_yield(20000); } - + /* Free dynamically allocated strings */ switch_safe_free(globals.ip); switch_safe_free(globals.dialplan);