mirror of
https://github.com/asterisk/asterisk.git
synced 2025-10-12 15:45:18 +00:00
stasis_cache: Prune stasis_subscription_change messages
The stasis cache provides a way to reconstruct the current state of topic subscribers. Unfortunately, since every subscribe and unsubscribe is cached, the cache continues to grow unabated while asterisk is running. This patch removes subscribe messages from the cache when the corresponding unsubscribe is received. This patch also registers the cache containers with ao2 so that if AO2_DEBUG is turned on, you can list the container and get its stats from the CLI. ASTERISK-27121 Change-Id: I3d18905e477f3721815da91f30da8d3fbb2d4f56
This commit is contained in:
@@ -48,6 +48,7 @@ struct stasis_cache {
|
||||
snapshot_get_id id_fn;
|
||||
cache_aggregate_calc_fn aggregate_calc_fn;
|
||||
cache_aggregate_publish_fn aggregate_publish_fn;
|
||||
int registered;
|
||||
};
|
||||
|
||||
/*! \internal */
|
||||
@@ -69,6 +70,8 @@ static void stasis_caching_topic_dtor(void *obj)
|
||||
* be bad. */
|
||||
ast_assert(stasis_subscription_is_done(caching_topic->sub));
|
||||
|
||||
ao2_container_unregister(stasis_topic_name(caching_topic->topic));
|
||||
|
||||
ao2_cleanup(caching_topic->sub);
|
||||
caching_topic->sub = NULL;
|
||||
ao2_cleanup(caching_topic->cache);
|
||||
@@ -813,7 +816,31 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
|
||||
}
|
||||
|
||||
msg_type = stasis_message_type(message);
|
||||
if (stasis_cache_clear_type() == msg_type) {
|
||||
|
||||
if (stasis_subscription_change_type() == msg_type) {
|
||||
struct stasis_subscription_change *change = stasis_message_data(message);
|
||||
|
||||
/*
|
||||
* If this change type is an unsubscribe, we need to find the original
|
||||
* subscribe and remove it from the cache otherwise the cache will
|
||||
* continue to grow unabated.
|
||||
*/
|
||||
if (strcmp(change->description, "Unsubscribe") == 0) {
|
||||
struct stasis_cache_entry *sub;
|
||||
|
||||
ao2_wrlock(caching_topic->cache->entries);
|
||||
sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
|
||||
if (sub) {
|
||||
cache_remove(caching_topic->cache->entries, sub, stasis_message_eid(message));
|
||||
ao2_cleanup(sub);
|
||||
}
|
||||
ao2_unlock(caching_topic->cache->entries);
|
||||
ao2_cleanup(caching_topic_needs_unref);
|
||||
return;
|
||||
}
|
||||
msg_put = message;
|
||||
msg = message;
|
||||
} else if (stasis_cache_clear_type() == msg_type) {
|
||||
/* Cache clear event. */
|
||||
msg_put = NULL;
|
||||
msg = stasis_message_data(message);
|
||||
@@ -866,6 +893,17 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
|
||||
ao2_cleanup(caching_topic_needs_unref);
|
||||
}
|
||||
|
||||
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
|
||||
{
|
||||
struct stasis_cache_entry *entry = v_obj;
|
||||
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
|
||||
entry->key.id, entry->key.hash);
|
||||
}
|
||||
|
||||
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
|
||||
{
|
||||
struct stasis_caching_topic *caching_topic;
|
||||
@@ -886,15 +924,24 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
|
||||
}
|
||||
|
||||
caching_topic->topic = stasis_topic_create(new_name);
|
||||
ast_free(new_name);
|
||||
if (caching_topic->topic == NULL) {
|
||||
ao2_ref(caching_topic, -1);
|
||||
ast_free(new_name);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ao2_ref(cache, +1);
|
||||
caching_topic->cache = cache;
|
||||
if (!cache->registered) {
|
||||
if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
|
||||
ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
|
||||
cache->entries, new_name);
|
||||
} else {
|
||||
cache->registered = 1;
|
||||
}
|
||||
}
|
||||
ast_free(new_name);
|
||||
|
||||
caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
|
||||
if (caching_topic->sub == NULL) {
|
||||
|
Reference in New Issue
Block a user