From f66ac8906bb41b67772051dcf06a6fbab707079c Mon Sep 17 00:00:00 2001 From: Mathieu Rene Date: Sat, 17 Jul 2010 01:14:11 -0400 Subject: [PATCH] mod_hash: remote sync in working state --- src/mod/applications/mod_hash/mod_hash.c | 159 +++++++++++++++++++++-- 1 file changed, 146 insertions(+), 13 deletions(-) diff --git a/src/mod/applications/mod_hash/mod_hash.c b/src/mod/applications/mod_hash/mod_hash.c index 782c980085..5e21e33108 100644 --- a/src/mod/applications/mod_hash/mod_hash.c +++ b/src/mod/applications/mod_hash/mod_hash.c @@ -54,10 +54,11 @@ static struct { } globals; typedef struct { - uint32_t total_usage; - uint32_t rate_usage; - time_t last_check; - uint32_t interval; + uint32_t total_usage; /* < Total */ + uint32_t rate_usage; /* < Current rate usage */ + time_t last_check; /* < Last rate check */ + uint32_t interval; /* < Interval used on last rate check */ + uint32_t last_update; /* < Last updated timestamp (rate or total) */ } limit_hash_item_t; struct callback { @@ -74,9 +75,9 @@ typedef struct { } limit_hash_private_t; typedef enum { - REMOTE_OFF = 0, /* Thread not running */ - REMOTE_DOWN, /* Cannot connect to remote instance */ - REMOTE_UP /* All good */ + REMOTE_OFF = 0, /* < Thread not running */ + REMOTE_DOWN, /* 0) { item->interval = interval; if (item->last_check <= (now - interval)) { @@ -164,7 +170,7 @@ SWITCH_LIMIT_INCR(limit_incr_hash) goto end; } } - } else if ((max >= 0) && (item->total_usage + increment > (uint32_t) max)) { + } else if ((max >= 0) && (item->total_usage + increment + remote_usage.total_usage > (uint32_t) max)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s is already at max value (%d)\n", hashkey, item->total_usage); status = SWITCH_STATUS_GENERR; goto end; @@ -225,6 +231,19 @@ SWITCH_HASH_DELETE_FUNC(limit_hash_cleanup_delete_callback) { return SWITCH_FALSE; } +SWITCH_HASH_DELETE_FUNC(limit_hash_remote_cleanup_callback) +{ + limit_hash_item_t *item = (limit_hash_item_t *) val; + switch_time_t now = (switch_time_t)(intptr_t)pData; + + if (item->last_update != now) { + free(item); + return SWITCH_TRUE; + } + + return SWITCH_FALSE; +} + /* !\brief Periodically checks for unused limit entries and frees them */ SWITCH_STANDARD_SCHED_FUNC(limit_hash_cleanup_callback) { @@ -300,14 +319,19 @@ SWITCH_LIMIT_USAGE(limit_usage_hash) char *hash_key = NULL; limit_hash_item_t *item = NULL; int count = 0; + limit_hash_item_t remote_usage; switch_thread_rwlock_rdlock(globals.limit_hash_rwlock); hash_key = switch_mprintf("%s_%s", realm, resource); + remote_usage = get_remote_usage(hash_key); + + count = remote_usage.total_usage; + *rcount = remote_usage.rate_usage; if ((item = switch_core_hash_find(globals.limit_hash, hash_key))) { - count = item->total_usage; - *rcount = item->rate_usage; + count += item->total_usage; + *rcount += item->rate_usage; } switch_safe_free(hash_key); @@ -576,7 +600,7 @@ void limit_remote_destroy(limit_remote_t **r) switch_thread_rwlock_wrlock((*r)->rwlock); /* Free hashtable data */ - for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) { + for (hi = switch_hash_first(NULL, (*r)->index); hi; hi = switch_hash_next(hi)) { void *val; const void *key; switch_ssize_t keylen; @@ -593,6 +617,41 @@ void limit_remote_destroy(limit_remote_t **r) } } +/* Compute the usage sum of a resource on remote boxes */ +static limit_hash_item_t get_remote_usage(const char *key) { + limit_hash_item_t usage = { 0 }; + switch_hash_index_t *hi; + + switch_thread_rwlock_rdlock(globals.remote_hash_rwlock); + for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) { + void *val; + const void *hashkey; + switch_ssize_t keylen; + limit_remote_t *remote; + limit_hash_item_t *item; + switch_hash_this(hi, &hashkey, &keylen, &val); + + remote = (limit_remote_t *)val; + if (remote->state != REMOTE_UP) { + continue; + } + + switch_thread_rwlock_rdlock(remote->rwlock); + if ((item = switch_core_hash_find(remote->index, key))) { + usage.total_usage += item->total_usage; + usage.rate_usage += item->rate_usage; + if (!usage.last_check) { + usage.last_check = item->last_check; + } + } + switch_thread_rwlock_unlock(remote->rwlock); + } + + switch_thread_rwlock_unlock(globals.remote_hash_rwlock); + + return usage; +} + static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, void *obj) { limit_remote_t *remote = (limit_remote_t*)obj; @@ -614,9 +673,58 @@ static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, voi remote->host, remote->port); memset(&remote->handle, 0, sizeof(remote->handle)); remote->state = REMOTE_DOWN; + /* Delete all remote tracking entries */ + switch_thread_rwlock_wrlock(remote->rwlock); + switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, NULL); + switch_thread_rwlock_unlock(remote->rwlock); } else { - const char *data = esl_event_get_header(remote->handle.last_sr_event, "reply-text"); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "RECV: %s\n", data); + if (!zstr(remote->handle.last_sr_event->body)) { + char *data = strdup(remote->handle.last_sr_event->body); + char *p = data, *p2; + switch_time_t now = switch_epoch_time_now(NULL); + while (p && *p) { + /* We are getting the limit data as: + L/a_c/1/0/0/0 + */ + if ((p2 = strchr(p, '\n'))) { + *p2++ = '\0'; + } + + /* Now p points at the beginning of the current line, + p2 at the start of the next one */ + if (*p == 'L') { /* Limit data */ + char *argv[5]; + int argc = switch_split(p+2, '/', argv); + + if (argc < 5) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Protocol error: missing argument in line: %s\n", p); + } else { + limit_hash_item_t *item; + switch_thread_rwlock_wrlock(remote->rwlock); + if (!(item = switch_core_hash_find(remote->index, argv[0]))) { + item = malloc(sizeof(*item)); + switch_core_hash_insert(remote->index, argv[0], item); + } + item->total_usage = atoi(argv[1]); + item->rate_usage = atoi(argv[2]); + item->interval = atoi(argv[3]); + item->last_check = atoi(argv[4]); + item->last_update = now; + switch_thread_rwlock_unlock(remote->rwlock); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Imported key %s %d %d/%d (%d - %d)\n", + argv[0], item->total_usage, item->rate_usage, item->interval, (int)item->last_check, (int)item->last_update); + } + } + + p = p2; + } + free(data); + + /* Now free up anything that wasnt in this update since it means their usage is 0 */ + switch_thread_rwlock_wrlock(remote->rwlock); + switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, (void*)(intptr_t)now); + switch_thread_rwlock_unlock(remote->rwlock); + } } } @@ -717,9 +825,34 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load) SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown) { switch_hash_index_t *hi; + switch_bool_t remote_clean = SWITCH_TRUE; switch_scheduler_del_task_group("mod_hash"); + /* Kill remote connections, destroy needs a wrlock so we unlock after finding a pointer */ + while(remote_clean) { + void *val; + const void *key; + switch_ssize_t keylen; + limit_remote_t *item = NULL; + + switch_thread_rwlock_rdlock(globals.remote_hash_rwlock); + if ((hi = switch_hash_first(NULL, globals.remote_hash))) { + switch_hash_this(hi, &key, &keylen, &val); + item = (limit_remote_t *)val; + } + switch_thread_rwlock_unlock(globals.remote_hash_rwlock); + + if (!item) { + remote_clean = SWITCH_FALSE; + } else { + limit_remote_destroy(&item); + switch_thread_rwlock_wrlock(globals.remote_hash_rwlock); + switch_core_hash_delete(globals.remote_hash, key); + switch_thread_rwlock_unlock(globals.remote_hash_rwlock); + } + } + switch_thread_rwlock_wrlock(globals.limit_hash_rwlock); switch_thread_rwlock_wrlock(globals.db_hash_rwlock);