mod_hash: begin working on remote support
This commit is contained in:
parent
10d468a684
commit
c5ad49da17
|
@ -307,7 +307,7 @@ static esl_status_t esl_event_base_add_header(esl_event_t *event, esl_stack_t st
|
|||
header = ALLOC(sizeof(*header));
|
||||
esl_assert(header);
|
||||
|
||||
if ((event->flags & EF_UNIQ_HEADERS)) {
|
||||
if ((event->flags & ESL_UNIQ_HEADERS)) {
|
||||
esl_event_del_header(event, header_name);
|
||||
}
|
||||
|
||||
|
|
|
@ -172,7 +172,7 @@ struct esl_event {
|
|||
};
|
||||
|
||||
typedef enum {
|
||||
EF_UNIQ_HEADERS = (1 << 0)
|
||||
ESL_UNIQ_HEADERS = (1 << 0)
|
||||
} esl_event_flag_t;
|
||||
|
||||
|
||||
|
|
|
@ -34,12 +34,14 @@
|
|||
*/
|
||||
|
||||
#include <switch.h>
|
||||
#include "esl.h"
|
||||
|
||||
#define LIMIT_HASH_CLEANUP_INTERVAL 900
|
||||
|
||||
SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load);
|
||||
SWITCH_MODULE_RUNTIME_FUNCTION(mod_hash_runtime);
|
||||
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown);
|
||||
SWITCH_MODULE_DEFINITION(mod_hash, mod_hash_load, mod_hash_shutdown, NULL);
|
||||
SWITCH_MODULE_DEFINITION(mod_hash, mod_hash_load, mod_hash_shutdown, mod_hash_runtime);
|
||||
|
||||
/* CORE STUFF */
|
||||
static struct {
|
||||
|
@ -48,6 +50,8 @@ static struct {
|
|||
switch_hash_t *limit_hash;
|
||||
switch_thread_rwlock_t *db_hash_rwlock;
|
||||
switch_hash_t *db_hash;
|
||||
switch_thread_rwlock_t *remote_hash_rwlock;
|
||||
switch_hash_t *remote_hash;
|
||||
} globals;
|
||||
|
||||
typedef struct {
|
||||
|
@ -70,6 +74,33 @@ typedef struct {
|
|||
switch_hash_t *hash;
|
||||
} limit_hash_private_t;
|
||||
|
||||
typedef enum {
|
||||
REMOTE_OFF = 0, /* Thread not running */
|
||||
REMOTE_DOWN, /* Cannot connect to remote instance */
|
||||
REMOTE_UP /* All good */
|
||||
} limit_remote_state_t;
|
||||
|
||||
typedef struct {
|
||||
const char *name;
|
||||
const char *host;
|
||||
const char *username;
|
||||
const char *password;
|
||||
int port;
|
||||
|
||||
int interval;
|
||||
|
||||
esl_handle_t handle;
|
||||
|
||||
switch_hash_t *index;
|
||||
switch_thread_rwlock_t *rwlock;
|
||||
switch_memory_pool_t *pool;
|
||||
|
||||
switch_bool_t running;
|
||||
switch_thread_t *thread;
|
||||
|
||||
limit_remote_state_t state;
|
||||
} limit_remote_t;
|
||||
|
||||
/* \brief Enforces limit_hash restrictions
|
||||
* \param session current session
|
||||
* \param realm limit realm
|
||||
|
@ -440,8 +471,201 @@ SWITCH_STANDARD_API(hash_api_function)
|
|||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/* INIT/DEINIT STUFF */
|
||||
#define HASH_DUMP_SYNTAX "all|limit|db"
|
||||
SWITCH_STANDARD_API(hash_dump_function)
|
||||
{
|
||||
int mode;
|
||||
switch_hash_index_t *hi;
|
||||
|
||||
if (zstr(cmd)) {
|
||||
stream->write_function(stream, "Usage: "HASH_DUMP_SYNTAX"\n");
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
if (!strcmp(cmd, "all")) {
|
||||
mode = 3;
|
||||
} else if (!strcmp(cmd, "limit")) {
|
||||
mode = 2;
|
||||
} else if (!strcmp(cmd, "db")) {
|
||||
mode = 1;
|
||||
} else {
|
||||
stream->write_function(stream, "Usage: "HASH_DUMP_SYNTAX"\n");
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
if (mode & 1) {
|
||||
switch_thread_rwlock_rdlock(globals.limit_hash_rwlock);
|
||||
for (hi = switch_hash_first(NULL, globals.limit_hash); hi; hi = switch_hash_next(hi)) {
|
||||
void *val = NULL;
|
||||
const void *key;
|
||||
switch_ssize_t keylen;
|
||||
limit_hash_item_t *item;
|
||||
switch_hash_this(hi, &key, &keylen, &val);
|
||||
|
||||
item = (limit_hash_item_t *)val;
|
||||
|
||||
stream->write_function(stream, "L/%s/%d/%d/%d/%d\n", key, item->total_usage, item->rate_usage, item->interval, item->last_check);
|
||||
}
|
||||
switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
|
||||
}
|
||||
|
||||
if (mode & 2) {
|
||||
switch_thread_rwlock_rdlock(globals.db_hash_rwlock);
|
||||
for (hi = switch_hash_first(NULL, globals.db_hash); hi; hi = switch_hash_next(hi)) {
|
||||
void *val = NULL;
|
||||
const void *key;
|
||||
switch_ssize_t keylen;
|
||||
switch_hash_this(hi, &key, &keylen, &val);
|
||||
|
||||
stream->write_function(stream, "D/%s/%s\n", key, (char*)val);
|
||||
}
|
||||
switch_thread_rwlock_unlock(globals.db_hash_rwlock);
|
||||
}
|
||||
|
||||
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
limit_remote_t *limit_remote_create(const char *name, const char *host, uint16_t port, const char *username, const char *password, int interval)
|
||||
{
|
||||
limit_remote_t *r;
|
||||
switch_memory_pool_t *pool;
|
||||
|
||||
switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
|
||||
if (switch_core_hash_find(globals.remote_hash, name)) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Already have a remote instance named %s\n", name);
|
||||
switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
|
||||
return NULL;
|
||||
}
|
||||
switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
|
||||
|
||||
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
r = switch_core_alloc(pool, sizeof(limit_remote_t));
|
||||
r->pool = pool;
|
||||
r->name = switch_core_strdup(r->pool, name);
|
||||
r->host = switch_core_strdup(r->pool, host);
|
||||
r->port = port;
|
||||
r->username = switch_core_strdup(r->pool, username);
|
||||
r->password = switch_core_strdup(r->pool, password);
|
||||
r->interval = interval;
|
||||
|
||||
switch_thread_rwlock_create(&r->rwlock, pool);
|
||||
switch_core_hash_init(&r->index, pool);
|
||||
|
||||
switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
|
||||
switch_core_hash_insert(globals.remote_hash, name, r);
|
||||
switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
void limit_remote_destroy(limit_remote_t **r)
|
||||
{
|
||||
if (r && *r) {
|
||||
switch_hash_index_t *hi;
|
||||
|
||||
(*r)->state = REMOTE_OFF;
|
||||
|
||||
if ((*r)->thread) {
|
||||
switch_thread_join(NULL, (*r)->thread);
|
||||
}
|
||||
|
||||
switch_thread_rwlock_wrlock((*r)->rwlock);
|
||||
|
||||
/* Free hashtable data */
|
||||
for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) {
|
||||
void *val;
|
||||
const void *key;
|
||||
switch_ssize_t keylen;
|
||||
switch_hash_this(hi, &key, &keylen, &val);
|
||||
|
||||
free(val);
|
||||
}
|
||||
|
||||
switch_thread_rwlock_unlock((*r)->rwlock);
|
||||
switch_thread_rwlock_destroy((*r)->rwlock);
|
||||
|
||||
switch_core_destroy_memory_pool(&((*r)->pool));
|
||||
*r = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, void *obj)
|
||||
{
|
||||
limit_remote_t *remote = (limit_remote_t*)obj;
|
||||
while (remote->state > REMOTE_OFF) {
|
||||
if (remote->state == REMOTE_OFF) {
|
||||
if (esl_connect(&remote->handle, remote->host, remote->port, remote->username, remote->password) == ESL_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Connected to remote FreeSWITCH at %s:%d\n",
|
||||
remote->host, remote->port);
|
||||
|
||||
remote->state = REMOTE_UP;
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't connect to remote FreeSWITCH at %s;%d\n",
|
||||
remote->host, remote->port);
|
||||
}
|
||||
} else {
|
||||
if (esl_send_recv(&remote->handle, "api hash_dump limit") != SWITCH_STATUS_SUCCESS) {
|
||||
esl_disconnect(&remote->handle);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Disconnected from remote FreeSWITCH at %s:%d\n",
|
||||
remote->host, remote->port);
|
||||
memset(&remote->handle, 0, sizeof(remote->handle));
|
||||
} else {
|
||||
const char *data = remote->handle.last_sr_reply;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "RECV: %s\n", data);
|
||||
}
|
||||
}
|
||||
|
||||
switch_yield(remote->interval * 1000);
|
||||
}
|
||||
|
||||
remote->thread = NULL;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void do_config()
|
||||
{
|
||||
switch_xml_t xml = NULL, x_lists = NULL, x_list = NULL, cfg = NULL;
|
||||
if ((xml = switch_xml_open_cfg("hash.conf", &cfg, NULL))) {
|
||||
if ((x_lists = switch_xml_child(cfg, "remotes"))) {
|
||||
for (x_list = switch_xml_child(x_lists, "remote"); x_list; x_list = x_list->next) {
|
||||
const char *name = switch_xml_attr(x_list, "name");
|
||||
const char *host = switch_xml_attr(x_list, "host");
|
||||
const char *szport = switch_xml_attr(x_list, "port");
|
||||
const char *username = switch_xml_attr(x_list, "username");
|
||||
const char *password = switch_xml_attr(x_list, "password");
|
||||
const char *szinterval = switch_xml_attr(x_list, "interval");
|
||||
int port = 0, interval = 0;
|
||||
limit_remote_t *remote;
|
||||
switch_threadattr_t *thd_attr = NULL;
|
||||
|
||||
if (!zstr(szport)) {
|
||||
port = atoi(szport);
|
||||
}
|
||||
|
||||
if (!zstr(szinterval)) {
|
||||
interval = atoi(szinterval);
|
||||
}
|
||||
|
||||
remote = limit_remote_create(name, host, port, username, password, interval);
|
||||
|
||||
remote->state = REMOTE_DOWN;
|
||||
|
||||
switch_threadattr_create(&thd_attr, remote->pool);
|
||||
switch_threadattr_detach_set(thd_attr, 1);
|
||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
switch_thread_create(&remote->thread, thd_attr, limit_remote_thread, remote, remote->pool);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* INIT/DEINIT STUFF */
|
||||
SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load)
|
||||
{
|
||||
switch_application_interface_t *app_interface;
|
||||
|
@ -460,8 +684,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load)
|
|||
|
||||
switch_thread_rwlock_create(&globals.limit_hash_rwlock, globals.pool);
|
||||
switch_thread_rwlock_create(&globals.db_hash_rwlock, globals.pool);
|
||||
switch_thread_rwlock_create(&globals.remote_hash_rwlock, globals.pool);
|
||||
switch_core_hash_init(&globals.limit_hash, pool);
|
||||
switch_core_hash_init(&globals.db_hash, pool);
|
||||
switch_core_hash_init(&globals.remote_hash, globals.pool);
|
||||
|
||||
/* connect my internal structure to the blank pointer passed to me */
|
||||
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
|
||||
|
@ -474,13 +700,16 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load)
|
|||
|
||||
SWITCH_ADD_APP(app_interface, "hash", "Insert into the hashtable", HASH_DESC, hash_function, HASH_USAGE, SAF_SUPPORT_NOMEDIA)
|
||||
SWITCH_ADD_API(commands_api_interface, "hash", "hash get/set", hash_api_function, "[insert|delete|select]/<realm>/<key>/<value>");
|
||||
SWITCH_ADD_API(commands_api_interface, "hash_dump", "dump hash/limit_hash data (used for synchronization)", hash_dump_function, HASH_DUMP_SYNTAX);
|
||||
|
||||
switch_console_set_complete("add hash insert");
|
||||
switch_console_set_complete("add hash delete");
|
||||
switch_console_set_complete("add hash select");
|
||||
|
||||
do_config();
|
||||
|
||||
/* indicate that the module should continue to be loaded */
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue