diff --git a/libs/esl/src/esl_event.c b/libs/esl/src/esl_event.c index 0999f90e0f..2e1d8a302a 100644 --- a/libs/esl/src/esl_event.c +++ b/libs/esl/src/esl_event.c @@ -307,7 +307,7 @@ static esl_status_t esl_event_base_add_header(esl_event_t *event, esl_stack_t st header = ALLOC(sizeof(*header)); esl_assert(header); - if ((event->flags & EF_UNIQ_HEADERS)) { + if ((event->flags & ESL_UNIQ_HEADERS)) { esl_event_del_header(event, header_name); } diff --git a/libs/esl/src/include/esl_event.h b/libs/esl/src/include/esl_event.h index 7df4c2175b..cf3793a5d9 100644 --- a/libs/esl/src/include/esl_event.h +++ b/libs/esl/src/include/esl_event.h @@ -172,7 +172,7 @@ struct esl_event { }; typedef enum { - EF_UNIQ_HEADERS = (1 << 0) + ESL_UNIQ_HEADERS = (1 << 0) } esl_event_flag_t; diff --git a/src/mod/applications/mod_hash/mod_hash.c b/src/mod/applications/mod_hash/mod_hash.c index ef72b04add..c6faf1d390 100644 --- a/src/mod/applications/mod_hash/mod_hash.c +++ b/src/mod/applications/mod_hash/mod_hash.c @@ -34,12 +34,14 @@ */ #include +#include "esl.h" #define LIMIT_HASH_CLEANUP_INTERVAL 900 SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load); +SWITCH_MODULE_RUNTIME_FUNCTION(mod_hash_runtime); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown); -SWITCH_MODULE_DEFINITION(mod_hash, mod_hash_load, mod_hash_shutdown, NULL); +SWITCH_MODULE_DEFINITION(mod_hash, mod_hash_load, mod_hash_shutdown, mod_hash_runtime); /* CORE STUFF */ static struct { @@ -48,6 +50,8 @@ static struct { switch_hash_t *limit_hash; switch_thread_rwlock_t *db_hash_rwlock; switch_hash_t *db_hash; + switch_thread_rwlock_t *remote_hash_rwlock; + switch_hash_t *remote_hash; } globals; typedef struct { @@ -70,6 +74,33 @@ typedef struct { switch_hash_t *hash; } limit_hash_private_t; +typedef enum { + REMOTE_OFF = 0, /* Thread not running */ + REMOTE_DOWN, /* Cannot connect to remote instance */ + REMOTE_UP /* All good */ +} limit_remote_state_t; + +typedef struct { + const char *name; + const char *host; + const char *username; + const char *password; + int port; + + int interval; + + esl_handle_t handle; + + switch_hash_t *index; + switch_thread_rwlock_t *rwlock; + switch_memory_pool_t *pool; + + switch_bool_t running; + switch_thread_t *thread; + + limit_remote_state_t state; +} limit_remote_t; + /* \brief Enforces limit_hash restrictions * \param session current session * \param realm limit realm @@ -440,8 +471,201 @@ SWITCH_STANDARD_API(hash_api_function) return SWITCH_STATUS_SUCCESS; } -/* INIT/DEINIT STUFF */ +#define HASH_DUMP_SYNTAX "all|limit|db" +SWITCH_STANDARD_API(hash_dump_function) +{ + int mode; + switch_hash_index_t *hi; + + if (zstr(cmd)) { + stream->write_function(stream, "Usage: "HASH_DUMP_SYNTAX"\n"); + return SWITCH_STATUS_SUCCESS; + } + + if (!strcmp(cmd, "all")) { + mode = 3; + } else if (!strcmp(cmd, "limit")) { + mode = 2; + } else if (!strcmp(cmd, "db")) { + mode = 1; + } else { + stream->write_function(stream, "Usage: "HASH_DUMP_SYNTAX"\n"); + return SWITCH_STATUS_SUCCESS; + } + + + if (mode & 1) { + switch_thread_rwlock_rdlock(globals.limit_hash_rwlock); + for (hi = switch_hash_first(NULL, globals.limit_hash); hi; hi = switch_hash_next(hi)) { + void *val = NULL; + const void *key; + switch_ssize_t keylen; + limit_hash_item_t *item; + switch_hash_this(hi, &key, &keylen, &val); + + item = (limit_hash_item_t *)val; + stream->write_function(stream, "L/%s/%d/%d/%d/%d\n", key, item->total_usage, item->rate_usage, item->interval, item->last_check); + } + switch_thread_rwlock_unlock(globals.limit_hash_rwlock); + } + + if (mode & 2) { + switch_thread_rwlock_rdlock(globals.db_hash_rwlock); + for (hi = switch_hash_first(NULL, globals.db_hash); hi; hi = switch_hash_next(hi)) { + void *val = NULL; + const void *key; + switch_ssize_t keylen; + switch_hash_this(hi, &key, &keylen, &val); + + stream->write_function(stream, "D/%s/%s\n", key, (char*)val); + } + switch_thread_rwlock_unlock(globals.db_hash_rwlock); + } + + + return SWITCH_STATUS_SUCCESS; +} + +limit_remote_t *limit_remote_create(const char *name, const char *host, uint16_t port, const char *username, const char *password, int interval) +{ + limit_remote_t *r; + switch_memory_pool_t *pool; + + switch_thread_rwlock_rdlock(globals.remote_hash_rwlock); + if (switch_core_hash_find(globals.remote_hash, name)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Already have a remote instance named %s\n", name); + switch_thread_rwlock_unlock(globals.remote_hash_rwlock); + return NULL; + } + switch_thread_rwlock_unlock(globals.remote_hash_rwlock); + + if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { + return NULL; + } + + r = switch_core_alloc(pool, sizeof(limit_remote_t)); + r->pool = pool; + r->name = switch_core_strdup(r->pool, name); + r->host = switch_core_strdup(r->pool, host); + r->port = port; + r->username = switch_core_strdup(r->pool, username); + r->password = switch_core_strdup(r->pool, password); + r->interval = interval; + + switch_thread_rwlock_create(&r->rwlock, pool); + switch_core_hash_init(&r->index, pool); + + switch_thread_rwlock_rdlock(globals.remote_hash_rwlock); + switch_core_hash_insert(globals.remote_hash, name, r); + switch_thread_rwlock_unlock(globals.remote_hash_rwlock); + + return r; +} + +void limit_remote_destroy(limit_remote_t **r) +{ + if (r && *r) { + switch_hash_index_t *hi; + + (*r)->state = REMOTE_OFF; + + if ((*r)->thread) { + switch_thread_join(NULL, (*r)->thread); + } + + switch_thread_rwlock_wrlock((*r)->rwlock); + + /* Free hashtable data */ + for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) { + void *val; + const void *key; + switch_ssize_t keylen; + switch_hash_this(hi, &key, &keylen, &val); + + free(val); + } + + switch_thread_rwlock_unlock((*r)->rwlock); + switch_thread_rwlock_destroy((*r)->rwlock); + + switch_core_destroy_memory_pool(&((*r)->pool)); + *r = NULL; + } +} + +static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, void *obj) +{ + limit_remote_t *remote = (limit_remote_t*)obj; + while (remote->state > REMOTE_OFF) { + if (remote->state == REMOTE_OFF) { + if (esl_connect(&remote->handle, remote->host, remote->port, remote->username, remote->password) == ESL_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Connected to remote FreeSWITCH at %s:%d\n", + remote->host, remote->port); + + remote->state = REMOTE_UP; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't connect to remote FreeSWITCH at %s;%d\n", + remote->host, remote->port); + } + } else { + if (esl_send_recv(&remote->handle, "api hash_dump limit") != SWITCH_STATUS_SUCCESS) { + esl_disconnect(&remote->handle); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Disconnected from remote FreeSWITCH at %s:%d\n", + remote->host, remote->port); + memset(&remote->handle, 0, sizeof(remote->handle)); + } else { + const char *data = remote->handle.last_sr_reply; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "RECV: %s\n", data); + } + } + + switch_yield(remote->interval * 1000); + } + + remote->thread = NULL; + + return NULL; +} + +static void do_config() +{ + switch_xml_t xml = NULL, x_lists = NULL, x_list = NULL, cfg = NULL; + if ((xml = switch_xml_open_cfg("hash.conf", &cfg, NULL))) { + if ((x_lists = switch_xml_child(cfg, "remotes"))) { + for (x_list = switch_xml_child(x_lists, "remote"); x_list; x_list = x_list->next) { + const char *name = switch_xml_attr(x_list, "name"); + const char *host = switch_xml_attr(x_list, "host"); + const char *szport = switch_xml_attr(x_list, "port"); + const char *username = switch_xml_attr(x_list, "username"); + const char *password = switch_xml_attr(x_list, "password"); + const char *szinterval = switch_xml_attr(x_list, "interval"); + int port = 0, interval = 0; + limit_remote_t *remote; + switch_threadattr_t *thd_attr = NULL; + + if (!zstr(szport)) { + port = atoi(szport); + } + + if (!zstr(szinterval)) { + interval = atoi(szinterval); + } + + remote = limit_remote_create(name, host, port, username, password, interval); + + remote->state = REMOTE_DOWN; + + switch_threadattr_create(&thd_attr, remote->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&remote->thread, thd_attr, limit_remote_thread, remote, remote->pool); + } + } + } +} + +/* INIT/DEINIT STUFF */ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load) { switch_application_interface_t *app_interface; @@ -460,8 +684,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load) switch_thread_rwlock_create(&globals.limit_hash_rwlock, globals.pool); switch_thread_rwlock_create(&globals.db_hash_rwlock, globals.pool); + switch_thread_rwlock_create(&globals.remote_hash_rwlock, globals.pool); switch_core_hash_init(&globals.limit_hash, pool); switch_core_hash_init(&globals.db_hash, pool); + switch_core_hash_init(&globals.remote_hash, globals.pool); /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); @@ -474,13 +700,16 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load) SWITCH_ADD_APP(app_interface, "hash", "Insert into the hashtable", HASH_DESC, hash_function, HASH_USAGE, SAF_SUPPORT_NOMEDIA) SWITCH_ADD_API(commands_api_interface, "hash", "hash get/set", hash_api_function, "[insert|delete|select]///"); + SWITCH_ADD_API(commands_api_interface, "hash_dump", "dump hash/limit_hash data (used for synchronization)", hash_dump_function, HASH_DUMP_SYNTAX); + switch_console_set_complete("add hash insert"); switch_console_set_complete("add hash delete"); switch_console_set_complete("add hash select"); + + do_config(); /* indicate that the module should continue to be loaded */ - return SWITCH_STATUS_SUCCESS; - + return SWITCH_STATUS_SUCCESS; }