mod_hash: remote sync in working state
This commit is contained in:
parent
83891a11d4
commit
f66ac8906b
|
@ -54,10 +54,11 @@ static struct {
|
|||
} globals;
|
||||
|
||||
typedef struct {
|
||||
uint32_t total_usage;
|
||||
uint32_t rate_usage;
|
||||
time_t last_check;
|
||||
uint32_t interval;
|
||||
uint32_t total_usage; /* < Total */
|
||||
uint32_t rate_usage; /* < Current rate usage */
|
||||
time_t last_check; /* < Last rate check */
|
||||
uint32_t interval; /* < Interval used on last rate check */
|
||||
uint32_t last_update; /* < Last updated timestamp (rate or total) */
|
||||
} limit_hash_item_t;
|
||||
|
||||
struct callback {
|
||||
|
@ -74,9 +75,9 @@ typedef struct {
|
|||
} limit_hash_private_t;
|
||||
|
||||
typedef enum {
|
||||
REMOTE_OFF = 0, /* Thread not running */
|
||||
REMOTE_DOWN, /* Cannot connect to remote instance */
|
||||
REMOTE_UP /* All good */
|
||||
REMOTE_OFF = 0, /* < Thread not running */
|
||||
REMOTE_DOWN, /* <C annot connect to remote instance */
|
||||
REMOTE_UP /* < All good */
|
||||
} limit_remote_state_t;
|
||||
|
||||
typedef struct {
|
||||
|
@ -100,6 +101,8 @@ typedef struct {
|
|||
limit_remote_state_t state;
|
||||
} limit_remote_t;
|
||||
|
||||
static limit_hash_item_t get_remote_usage(const char *key);
|
||||
|
||||
/* \brief Enforces limit_hash restrictions
|
||||
* \param session current session
|
||||
* \param realm limit realm
|
||||
|
@ -117,6 +120,7 @@ SWITCH_LIMIT_INCR(limit_incr_hash)
|
|||
time_t now = switch_epoch_time_now(NULL);
|
||||
limit_hash_private_t *pvt = NULL;
|
||||
uint8_t increment = 1;
|
||||
limit_hash_item_t remote_usage;
|
||||
|
||||
hashkey = switch_core_session_sprintf(session, "%s_%s", realm, resource);
|
||||
|
||||
|
@ -146,6 +150,8 @@ SWITCH_LIMIT_INCR(limit_incr_hash)
|
|||
switch_channel_set_private(channel, "limit_hash", pvt);
|
||||
}
|
||||
|
||||
remote_usage = get_remote_usage(hashkey);
|
||||
|
||||
if (interval > 0) {
|
||||
item->interval = interval;
|
||||
if (item->last_check <= (now - interval)) {
|
||||
|
@ -164,7 +170,7 @@ SWITCH_LIMIT_INCR(limit_incr_hash)
|
|||
goto end;
|
||||
}
|
||||
}
|
||||
} else if ((max >= 0) && (item->total_usage + increment > (uint32_t) max)) {
|
||||
} else if ((max >= 0) && (item->total_usage + increment + remote_usage.total_usage > (uint32_t) max)) {
|
||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s is already at max value (%d)\n", hashkey, item->total_usage);
|
||||
status = SWITCH_STATUS_GENERR;
|
||||
goto end;
|
||||
|
@ -225,6 +231,19 @@ SWITCH_HASH_DELETE_FUNC(limit_hash_cleanup_delete_callback) {
|
|||
return SWITCH_FALSE;
|
||||
}
|
||||
|
||||
SWITCH_HASH_DELETE_FUNC(limit_hash_remote_cleanup_callback)
|
||||
{
|
||||
limit_hash_item_t *item = (limit_hash_item_t *) val;
|
||||
switch_time_t now = (switch_time_t)(intptr_t)pData;
|
||||
|
||||
if (item->last_update != now) {
|
||||
free(item);
|
||||
return SWITCH_TRUE;
|
||||
}
|
||||
|
||||
return SWITCH_FALSE;
|
||||
}
|
||||
|
||||
/* !\brief Periodically checks for unused limit entries and frees them */
|
||||
SWITCH_STANDARD_SCHED_FUNC(limit_hash_cleanup_callback)
|
||||
{
|
||||
|
@ -300,14 +319,19 @@ SWITCH_LIMIT_USAGE(limit_usage_hash)
|
|||
char *hash_key = NULL;
|
||||
limit_hash_item_t *item = NULL;
|
||||
int count = 0;
|
||||
limit_hash_item_t remote_usage;
|
||||
|
||||
switch_thread_rwlock_rdlock(globals.limit_hash_rwlock);
|
||||
|
||||
hash_key = switch_mprintf("%s_%s", realm, resource);
|
||||
remote_usage = get_remote_usage(hash_key);
|
||||
|
||||
count = remote_usage.total_usage;
|
||||
*rcount = remote_usage.rate_usage;
|
||||
|
||||
if ((item = switch_core_hash_find(globals.limit_hash, hash_key))) {
|
||||
count = item->total_usage;
|
||||
*rcount = item->rate_usage;
|
||||
count += item->total_usage;
|
||||
*rcount += item->rate_usage;
|
||||
}
|
||||
|
||||
switch_safe_free(hash_key);
|
||||
|
@ -576,7 +600,7 @@ void limit_remote_destroy(limit_remote_t **r)
|
|||
switch_thread_rwlock_wrlock((*r)->rwlock);
|
||||
|
||||
/* Free hashtable data */
|
||||
for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) {
|
||||
for (hi = switch_hash_first(NULL, (*r)->index); hi; hi = switch_hash_next(hi)) {
|
||||
void *val;
|
||||
const void *key;
|
||||
switch_ssize_t keylen;
|
||||
|
@ -593,6 +617,41 @@ void limit_remote_destroy(limit_remote_t **r)
|
|||
}
|
||||
}
|
||||
|
||||
/* Compute the usage sum of a resource on remote boxes */
|
||||
static limit_hash_item_t get_remote_usage(const char *key) {
|
||||
limit_hash_item_t usage = { 0 };
|
||||
switch_hash_index_t *hi;
|
||||
|
||||
switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
|
||||
for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) {
|
||||
void *val;
|
||||
const void *hashkey;
|
||||
switch_ssize_t keylen;
|
||||
limit_remote_t *remote;
|
||||
limit_hash_item_t *item;
|
||||
switch_hash_this(hi, &hashkey, &keylen, &val);
|
||||
|
||||
remote = (limit_remote_t *)val;
|
||||
if (remote->state != REMOTE_UP) {
|
||||
continue;
|
||||
}
|
||||
|
||||
switch_thread_rwlock_rdlock(remote->rwlock);
|
||||
if ((item = switch_core_hash_find(remote->index, key))) {
|
||||
usage.total_usage += item->total_usage;
|
||||
usage.rate_usage += item->rate_usage;
|
||||
if (!usage.last_check) {
|
||||
usage.last_check = item->last_check;
|
||||
}
|
||||
}
|
||||
switch_thread_rwlock_unlock(remote->rwlock);
|
||||
}
|
||||
|
||||
switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
|
||||
|
||||
return usage;
|
||||
}
|
||||
|
||||
static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, void *obj)
|
||||
{
|
||||
limit_remote_t *remote = (limit_remote_t*)obj;
|
||||
|
@ -614,9 +673,58 @@ static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, voi
|
|||
remote->host, remote->port);
|
||||
memset(&remote->handle, 0, sizeof(remote->handle));
|
||||
remote->state = REMOTE_DOWN;
|
||||
/* Delete all remote tracking entries */
|
||||
switch_thread_rwlock_wrlock(remote->rwlock);
|
||||
switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, NULL);
|
||||
switch_thread_rwlock_unlock(remote->rwlock);
|
||||
} else {
|
||||
const char *data = esl_event_get_header(remote->handle.last_sr_event, "reply-text");
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "RECV: %s\n", data);
|
||||
if (!zstr(remote->handle.last_sr_event->body)) {
|
||||
char *data = strdup(remote->handle.last_sr_event->body);
|
||||
char *p = data, *p2;
|
||||
switch_time_t now = switch_epoch_time_now(NULL);
|
||||
while (p && *p) {
|
||||
/* We are getting the limit data as:
|
||||
L/a_c/1/0/0/0
|
||||
*/
|
||||
if ((p2 = strchr(p, '\n'))) {
|
||||
*p2++ = '\0';
|
||||
}
|
||||
|
||||
/* Now p points at the beginning of the current line,
|
||||
p2 at the start of the next one */
|
||||
if (*p == 'L') { /* Limit data */
|
||||
char *argv[5];
|
||||
int argc = switch_split(p+2, '/', argv);
|
||||
|
||||
if (argc < 5) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Protocol error: missing argument in line: %s\n", p);
|
||||
} else {
|
||||
limit_hash_item_t *item;
|
||||
switch_thread_rwlock_wrlock(remote->rwlock);
|
||||
if (!(item = switch_core_hash_find(remote->index, argv[0]))) {
|
||||
item = malloc(sizeof(*item));
|
||||
switch_core_hash_insert(remote->index, argv[0], item);
|
||||
}
|
||||
item->total_usage = atoi(argv[1]);
|
||||
item->rate_usage = atoi(argv[2]);
|
||||
item->interval = atoi(argv[3]);
|
||||
item->last_check = atoi(argv[4]);
|
||||
item->last_update = now;
|
||||
switch_thread_rwlock_unlock(remote->rwlock);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Imported key %s %d %d/%d (%d - %d)\n",
|
||||
argv[0], item->total_usage, item->rate_usage, item->interval, (int)item->last_check, (int)item->last_update);
|
||||
}
|
||||
}
|
||||
|
||||
p = p2;
|
||||
}
|
||||
free(data);
|
||||
|
||||
/* Now free up anything that wasnt in this update since it means their usage is 0 */
|
||||
switch_thread_rwlock_wrlock(remote->rwlock);
|
||||
switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, (void*)(intptr_t)now);
|
||||
switch_thread_rwlock_unlock(remote->rwlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -717,9 +825,34 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load)
|
|||
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown)
|
||||
{
|
||||
switch_hash_index_t *hi;
|
||||
switch_bool_t remote_clean = SWITCH_TRUE;
|
||||
|
||||
switch_scheduler_del_task_group("mod_hash");
|
||||
|
||||
/* Kill remote connections, destroy needs a wrlock so we unlock after finding a pointer */
|
||||
while(remote_clean) {
|
||||
void *val;
|
||||
const void *key;
|
||||
switch_ssize_t keylen;
|
||||
limit_remote_t *item = NULL;
|
||||
|
||||
switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
|
||||
if ((hi = switch_hash_first(NULL, globals.remote_hash))) {
|
||||
switch_hash_this(hi, &key, &keylen, &val);
|
||||
item = (limit_remote_t *)val;
|
||||
}
|
||||
switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
|
||||
|
||||
if (!item) {
|
||||
remote_clean = SWITCH_FALSE;
|
||||
} else {
|
||||
limit_remote_destroy(&item);
|
||||
switch_thread_rwlock_wrlock(globals.remote_hash_rwlock);
|
||||
switch_core_hash_delete(globals.remote_hash, key);
|
||||
switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
|
||||
}
|
||||
}
|
||||
|
||||
switch_thread_rwlock_wrlock(globals.limit_hash_rwlock);
|
||||
switch_thread_rwlock_wrlock(globals.db_hash_rwlock);
|
||||
|
||||
|
|
Loading…
Reference in New Issue