diff --git a/include/asterisk/astobj2.h b/include/asterisk/astobj2.h index b43c5ee943..933092fe60 100644 --- a/include/asterisk/astobj2.h +++ b/include/asterisk/astobj2.h @@ -510,6 +510,25 @@ void *__ao2_alloc(size_t data_size, ao2_destructor_fn destructor_fn, unsigned in #endif +/*! + * \since 12 + * \brief Bump refcount on an AO2 object by one, returning the object. + * + * This is useful for inlining a ref bump, and you don't care about the ref + * count. Also \c NULL safe, for even more convenience. + * + * \param obj AO2 object to bump the refcount on. + * \retval The given \a obj pointer. + */ +#define ao2_bump(obj) \ + ({ \ + typeof(obj) __obj_ ## __LINE__ = (obj); \ + if (__obj_ ## __LINE__) { \ + ao2_ref(__obj_ ## __LINE__, +1); \ + } \ + __obj_ ## __LINE__; \ + }) + int __ao2_ref_debug(void *o, int delta, const char *tag, const char *file, int line, const char *func); int __ao2_ref(void *o, int delta); diff --git a/main/stasis.c b/main/stasis.c index b1af7b7f66..1a03bb3d44 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -39,6 +39,95 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/utils.h" #include "asterisk/uuid.h" +/*! + * \page stasis-impl Stasis Implementation Notes + * + * \par Reference counting + * + * Stasis introduces a number of objects, which are tightly related to one + * another. Because we rely on ref-counting for memory management, understanding + * these relationships is important to understanding this code. + * + * \code{.txt} + * + * stasis_topic <----> stasis_subscription + * ^ ^ + * \ / + * \ / + * dispatch + * | + * | + * v + * stasis_message + * | + * | + * v + * stasis_message_type + * + * \endcode + * + * The most troubling thing in this chart is the cyclic reference between + * stasis_topic and stasis_subscription. This is both unfortunate, and + * necessary. Topics need the subscription in order to dispatch messages; + * subscriptions need the topics to unsubscribe and check subscription status. + * + * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the + * topic's reference to a subscription. When the subcription is destroyed, it + * will remove its reference to the topic. + * + * This means that until a subscription has be explicitly unsubscribed, it will + * not be destroyed. Neither will a topic be destroyed while it has subscribers. + * The destructors of both have assertions regarding this to catch ref-counting + * problems where a subscription or topic has had an extra ao2_cleanup(). + * + * The \ref dispatch object is a transient object, which is posted to a + * subscription's taskprocessor to send a message to the subscriber. They have + * short life cycles, allocated on one thread, destroyed on another. + * + * During shutdown, or the deletion of a domain object, there are a flurry of + * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages + * are processed. Any one of these cleanups could be the one to actually destroy + * a given object, so care must be taken to ensure that an object isn't + * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock() + * that might happen when a RAII_VAR() goes out of scope. + * + * \par Typical life cycles + * + * \li stasis_topic - There are several topics which live for the duration of + * the Asterisk process (ast_channel_topic_all(), etc.) but most of these + * are actually fed by shorter-lived topics whose lifetime is associated + * with some domain object (like ast_channel_topic() for a given + * ast_channel). + * + * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as + * topics, for similar reasons. + * + * \li dispatch - Very short lived; just long enough to post a message to a + * subscriber. + * + * \li stasis_message - Short to intermediate lifetimes, but that is mostly + * irrelevant. Messages are strictly data and have no behavior associated + * with them, so it doesn't really matter if/when they are destroyed. By + * design, a component could hold a ref to a message forever without any + * ill consequences (aside from consuming more memory). + * + * \li stasis_message_type - Long life cycles, typically only destroyed on + * module unloading or _clean_ process exit. + * + * \par Subscriber shutdown sequencing + * + * Subscribers are sensitive to shutdown sequencing, specifically in how the + * reference message types. This is fully detailed on the wiki at + * https://wiki.asterisk.org/wiki/x/K4BqAQ. + * + * In short, the lifetime of the \a data (and \a callback, if in a module) must + * be held until the stasis_subscription_final_message() has been received. + * Depending on the structure of the subscriber code, this can be handled by + * using stasis_subscription_final_message() to free resources on the final + * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to + * block until the unsubscribe has completed. + */ + /*! Initial size of the subscribers list. */ #define INITIAL_SUBSCRIBERS_MAX 4 @@ -53,7 +142,7 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); /*! \internal */ struct stasis_topic { char *name; - /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */ + /*! Variable length array of the subscribers */ struct stasis_subscription **subscribers; /*! Allocated length of the subscribers array */ size_t num_subscribers_max; @@ -67,6 +156,10 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs static void topic_dtor(void *obj) { struct stasis_topic *topic = obj; + + /* Subscribers hold a reference to topics, so they should all be + * unsubscribed before we get here. */ + ast_assert(topic->num_subscribers_current == 0); ast_free(topic->name); topic->name = NULL; ast_free(topic->subscribers); @@ -116,7 +209,7 @@ struct stasis_subscription { /*! Data pointer to be handed to the callback. */ void *data; - /*! Lock for joining with subscription. */ + /*! Lock for completion flags \c final_message_{rxed,processed}. */ ast_mutex_t join_lock; /*! Condition for joining with subscription. */ ast_cond_t join_cond; @@ -131,8 +224,14 @@ struct stasis_subscription { static void subscription_dtor(void *obj) { struct stasis_subscription *sub = obj; + + /* Subscriptions need to be manually unsubscribed before destruction + * b/c there's a cyclic reference between topics and subscriptions */ ast_assert(!stasis_subscription_is_subscribed(sub)); + /* If there are any messages in flight to this subscription; that would + * be bad. */ ast_assert(stasis_subscription_is_done(sub)); + ao2_cleanup(sub->topic); sub->topic = NULL; ast_taskprocessor_unreference(sub->mailbox); @@ -221,7 +320,10 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { if (sub) { size_t i; - struct stasis_topic *topic = sub->topic; + /* The subscription may be the last ref to this topic. Hold + * the topic ref open until after the unlock. */ + RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic), + ao2_cleanup); SCOPED_AO2LOCK(lock_topic, topic); for (i = 0; i < topic->num_subscribers_current; ++i) { @@ -240,11 +342,6 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) return NULL; } -/*! - * \brief Block until the final message has been received on a subscription. - * - * \param subscription Subscription to wait on. - */ void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { @@ -347,7 +444,12 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs topic->num_subscribers_max *= 2; } - /* Don't ref sub here or we'll cause a reference cycle. */ + /* The reference from the topic to the subscription is shared with + * the owner of the subscription, which will explicitly unsubscribe + * to release it. + * + * If we bumped the refcount here, the owner would have to unsubscribe + * and cleanup, which is a bit awkward. */ topic->subscribers[topic->num_subscribers_current++] = sub; return 0; } @@ -413,9 +515,13 @@ static int dispatch_exec(void *data) return 0; } -void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message) +void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message) { size_t i; + /* The topic may be unref'ed by the subscription invocation. + * Make sure we hold onto a reference while dispatching. */ + RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic), + ao2_cleanup); SCOPED_AO2LOCK(lock, topic); ast_assert(topic != NULL); @@ -625,7 +731,7 @@ void stasis_log_bad_type_access(const char *name) ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name); } -/*! \brief Cleanup function */ +/*! \brief Shutdown function */ static void stasis_exit(void) { ast_threadpool_shutdown(pool); diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 2ca4083df4..d4375520da 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -59,8 +59,14 @@ struct stasis_caching_topic { static void stasis_caching_topic_dtor(void *obj) { struct stasis_caching_topic *caching_topic = obj; + + /* Caching topics contain subscriptions, and must be manually + * unsubscribed. */ ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub)); + /* If there are any messages in flight to this subscription; that would + * be bad. */ ast_assert(stasis_subscription_is_done(caching_topic->sub)); + ao2_cleanup(caching_topic->sub); caching_topic->sub = NULL; ao2_cleanup(caching_topic->cache); diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index 0ee57ba242..381fdd9898 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -177,6 +177,9 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) stasis_unsubscribe(one->forward_cached_to_all); one->forward_cached_to_all = NULL; stasis_caching_unsubscribe(one->topic_cached); + one->topic_cached = NULL; + + ao2_cleanup(one); } struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one) diff --git a/main/stasis_channels.c b/main/stasis_channels.c index 4f88a5a6c5..37b6b62ee4 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -912,6 +912,13 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type, static void stasis_channels_cleanup(void) { + stasis_caching_unsubscribe_and_join(channel_by_name_topic); + channel_by_name_topic = NULL; + ao2_cleanup(channel_cache_by_name); + channel_cache_by_name = NULL; + ao2_cleanup(channel_cache_all); + channel_cache_all = NULL; + STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type); @@ -931,13 +938,6 @@ static void stasis_channels_cleanup(void) STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type); - - stasis_caching_unsubscribe_and_join(channel_by_name_topic); - channel_by_name_topic = NULL; - ao2_cleanup(channel_cache_by_name); - channel_cache_by_name = NULL; - ao2_cleanup(channel_cache_all); - channel_cache_all = NULL; } int ast_stasis_channels_init(void) diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 1e460566b5..da9c50874e 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -1271,6 +1271,97 @@ AST_TEST_DEFINE(to_ami) return AST_TEST_PASS; } +static void noop(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) +{ + /* no-op */ +} + +AST_TEST_DEFINE(dtor_order) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test that destruction order doesn't bomb stuff"; + info->description = "Test that destruction order doesn't bomb stuff"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("test-topic"); + ast_test_validate(test, NULL != topic); + + sub = stasis_subscribe(topic, noop, NULL); + ast_test_validate(test, NULL != sub); + + /* With any luck, this won't completely blow everything up */ + ao2_cleanup(topic); + stasis_unsubscribe(sub); + + /* These refs were cleaned up manually */ + topic = NULL; + sub = NULL; + + return AST_TEST_PASS; +} + +static const char *noop_get_id(struct stasis_message *message) +{ + return NULL; +} + +AST_TEST_DEFINE(caching_dtor_order) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, + stasis_caching_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test that destruction order doesn't bomb stuff"; + info->description = "Test that destruction order doesn't bomb stuff"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + cache = stasis_cache_create(noop_get_id); + ast_test_validate(test, NULL != cache); + + topic = stasis_topic_create("test-topic"); + ast_test_validate(test, NULL != topic); + + caching_topic = stasis_caching_topic_create(topic, cache); + ast_test_validate(test, NULL != caching_topic); + + sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop, + NULL); + ast_test_validate(test, NULL != sub); + + /* With any luck, this won't completely blow everything up */ + ao2_cleanup(cache); + ao2_cleanup(topic); + stasis_caching_unsubscribe(caching_topic); + stasis_unsubscribe(sub); + + /* These refs were cleaned up manually */ + cache = NULL; + topic = NULL; + caching_topic = NULL; + sub = NULL; + + return AST_TEST_PASS; +} + static int unload_module(void) { AST_TEST_UNREGISTER(message_type); @@ -1290,6 +1381,8 @@ static int unload_module(void) AST_TEST_UNREGISTER(to_json); AST_TEST_UNREGISTER(no_to_ami); AST_TEST_UNREGISTER(to_ami); + AST_TEST_UNREGISTER(dtor_order); + AST_TEST_UNREGISTER(caching_dtor_order); return 0; } @@ -1312,6 +1405,8 @@ static int load_module(void) AST_TEST_REGISTER(to_json); AST_TEST_REGISTER(no_to_ami); AST_TEST_REGISTER(to_ami); + AST_TEST_REGISTER(dtor_order); + AST_TEST_REGISTER(caching_dtor_order); return AST_MODULE_LOAD_SUCCESS; }