diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index b14868b4a5..3209adb167 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router); * updates for types not handled by routes added with * stasis_message_router_add_cache_update(). * + * Adding multiple routes for the same message type results in undefined + * behavior. + * * \param router Router to add the route to. * \param message_type Type of message to route. * \param callback Callback to forard messages of \a message_type to. @@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router, * These are distinct from regular routes, so one could have both a regular * route and a cache route for the same \a message_type. * + * Adding multiple routes for the same message type results in undefined + * behavior. + * * \param router Router to add the route to. * \param message_type Subtype of cache update to route. * \param callback Callback to forard messages of \a message_type to. @@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router, /*! * \brief Remove a route from a message router. * + * If a route is removed from another thread, there is no notification that + * all messages using this route have been processed. This typically means that + * the associated \c data pointer for this route must be kept until the + * route itself is disposed of. + * * \param router Router to remove the route from. * \param message_type Type of message to route. * @@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router, /*! * \brief Remove a cache route from a message router. * + * If a route is removed from another thread, there is no notification that + * all messages using this route have been processed. This typically means that + * the associated \c data pointer for this route must be kept until the + * route itself is disposed of. + * * \param router Router to remove the route from. * \param message_type Type of message to route. * diff --git a/main/astobj2.c b/main/astobj2.c index 88a6a6a234..88801bd2fe 100644 --- a/main/astobj2.c +++ b/main/astobj2.c @@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li ast_atomic_fetchadd_int(&ao2.total_objects, -1); #endif + /* In case someone uses an object after it's been freed */ + obj->priv_data.magic = 0; + switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) { case AO2_ALLOC_OPT_LOCK_MUTEX: obj_mutex = INTERNAL_OBJ_MUTEX(user_data); ast_mutex_destroy(&obj_mutex->mutex.lock); - /* - * For safety, zero-out the astobj2_lock header and also the - * first word of the user-data, which we make sure is always - * allocated. - */ - memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) ); ast_free(obj_mutex); break; case AO2_ALLOC_OPT_LOCK_RWLOCK: obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data); ast_rwlock_destroy(&obj_rwlock->rwlock.lock); - /* - * For safety, zero-out the astobj2_rwlock header and also the - * first word of the user-data, which we make sure is always - * allocated. - */ - memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) ); ast_free(obj_rwlock); break; case AO2_ALLOC_OPT_LOCK_NOLOCK: - /* - * For safety, zero-out the astobj2 header and also the first - * word of the user-data, which we make sure is always - * allocated. - */ - memset(obj, '\0', sizeof(*obj) + sizeof(void *) ); ast_free(obj); break; default: @@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f struct astobj2_lock *obj_mutex; struct astobj2_rwlock *obj_rwlock; - if (data_size < sizeof(void *)) { - /* - * We always alloc at least the size of a void *, - * for debugging purposes. - */ - data_size = sizeof(void *); - } - switch (options & AO2_ALLOC_OPT_LOCK_MASK) { case AO2_ALLOC_OPT_LOCK_MUTEX: #if defined(__AST_DEBUG_MALLOC) diff --git a/main/stasis.c b/main/stasis.c index 1a03bb3d44..7c8c349209 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -467,23 +467,24 @@ struct dispatch { struct stasis_subscription *sub; }; -static void dispatch_dtor(void *data) +static void dispatch_dtor(struct dispatch *dispatch) { - struct dispatch *dispatch = data; ao2_cleanup(dispatch->topic); ao2_cleanup(dispatch->message); ao2_cleanup(dispatch->sub); + + ast_free(dispatch); } static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub) { - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + struct dispatch *dispatch; ast_assert(topic != NULL); ast_assert(message != NULL); ast_assert(sub != NULL); - dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor); + dispatch = ast_malloc(sizeof(*dispatch)); if (!dispatch) { return NULL; } @@ -497,7 +498,6 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi dispatch->sub = sub; ao2_ref(sub, +1); - ao2_ref(dispatch, +1); return dispatch; } @@ -508,9 +508,10 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi */ static int dispatch_exec(void *data) { - RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup); + struct dispatch *dispatch = data; subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message); + dispatch_dtor(dispatch); return 0; } @@ -534,20 +535,19 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu ast_assert(sub != NULL); if (sub->mailbox) { - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + struct dispatch *dispatch; dispatch = dispatch_create(publisher_topic, message, sub); if (!dispatch) { - ast_log(LOG_DEBUG, "Dropping dispatch\n"); + ast_log(LOG_ERROR, "Dropping dispatch\n"); break; } - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { - /* Ownership transferred to mailbox. - * Don't increment ref, b/c the task processor - * may have already gotten rid of the object. + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) { + /* Push failed; just delete the dispatch. */ - dispatch = NULL; + ast_log(LOG_ERROR, "Dropping dispatch\n"); + dispatch_dtor(dispatch); } } else { /* Dispatch directly */ diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 26d2f2c0c7..864cf42c47 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" -/*! Number of hash buckets for the route table. Keep it prime! */ -#define ROUTE_TABLE_BUCKETS 7 - /*! \internal */ struct stasis_message_route { /*! Message type handle by this route. */ @@ -47,29 +44,79 @@ struct stasis_message_route { void *data; }; -static void route_dtor(void *obj) -{ - struct stasis_message_route *route = obj; +struct route_table { + /*! Current number of entries in the route table */ + size_t current_size; + /*! Allocated number of entires in the route table */ + size_t max_size; + /*! The route table itself */ + struct stasis_message_route routes[]; +}; - ao2_cleanup(route->message_type); - route->message_type = NULL; +static struct stasis_message_route *table_find_route(struct route_table *table, + struct stasis_message_type *message_type) +{ + size_t idx; + + /* While a linear search for routes may seem very inefficient, most + * route tables have six routes or less. For such small data, it's + * hard to beat a linear search. If we start having larger route + * tables, then we can look into containers with more efficient + * lookups. + */ + for (idx = 0; idx < table->current_size; ++idx) { + if (table->routes[idx].message_type == message_type) { + return &table->routes[idx]; + } + } + + return NULL; } -static int route_hash(const void *obj, const int flags) +static int table_add_route(struct route_table **table_ptr, + struct stasis_message_type *message_type, + stasis_subscription_cb callback, void *data) { - const struct stasis_message_route *route = obj; - const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type; + struct route_table *table = *table_ptr; + struct stasis_message_route *route; - return ast_str_hash(stasis_message_type_name(message_type)); + ast_assert(table_find_route(table, message_type) == NULL); + + if (table->current_size + 1 > table->max_size) { + size_t new_max_size = table->max_size ? table->max_size * 2 : 1; + struct route_table *new_table = ast_realloc(table, + sizeof(*new_table) + + sizeof(new_table->routes[0]) * new_max_size); + if (!new_table) { + return -1; + } + *table_ptr = table = new_table; + table->max_size = new_max_size; + } + + route = &table->routes[table->current_size++]; + + route->message_type = ao2_bump(message_type); + route->callback = callback; + route->data = data; + + return 0; } -static int route_cmp(void *obj, void *arg, int flags) +static int table_remove_route(struct route_table *table, + struct stasis_message_type *message_type) { - const struct stasis_message_route *left = obj; - const struct stasis_message_route *right = arg; - const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type; + size_t idx; - return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0; + for (idx = 0; idx < table->current_size; ++idx) { + if (table->routes[idx].message_type == message_type) { + ao2_cleanup(message_type); + table->routes[idx] = + table->routes[--table->current_size]; + return 0; + } + } + return -1; } /*! \internal */ @@ -77,11 +124,11 @@ struct stasis_message_router { /*! Subscription to the upstream topic */ struct stasis_subscription *subscription; /*! Subscribed routes */ - struct ao2_container *routes; - /*! Subscribed routes for \ref stasi_cache_update messages */ - struct ao2_container *cache_routes; + struct route_table *routes; + /*! Subscribed routes for \ref stasis_cache_update messages */ + struct route_table *cache_routes; /*! Route of last resort */ - struct stasis_message_route *default_route; + struct stasis_message_route default_route; }; static void router_dtor(void *obj) @@ -92,49 +139,47 @@ static void router_dtor(void *obj) ast_assert(stasis_subscription_is_done(router->subscription)); router->subscription = NULL; - ao2_cleanup(router->routes); + ast_free(router->routes); router->routes = NULL; - ao2_cleanup(router->cache_routes); + ast_free(router->cache_routes); router->cache_routes = NULL; - - ao2_cleanup(router->default_route); - router->default_route = NULL; } -static struct stasis_message_route *find_route( +static int find_route( struct stasis_message_router *router, - struct stasis_message *message) + struct stasis_message *message, + struct stasis_message_route *route_out) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + struct stasis_message_route *route = NULL; struct stasis_message_type *type = stasis_message_type(message); SCOPED_AO2LOCK(lock, router); + ast_assert(route_out != NULL); + if (type == stasis_cache_update_type()) { /* Find a cache route */ struct stasis_cache_update *update = stasis_message_data(message); - route = ao2_find(router->cache_routes, update->type, OBJ_KEY); + route = table_find_route(router->cache_routes, update->type); } if (route == NULL) { /* Find a regular route */ - route = ao2_find(router->routes, type, OBJ_KEY); + route = table_find_route(router->routes, type); } - if (route == NULL) { + if (route == NULL && router->default_route.callback) { /* Maybe the default route, then? */ - if ((route = router->default_route)) { - ao2_ref(route, +1); - } + route = &router->default_route; } - if (route == NULL) { - return NULL; + if (!route) { + return -1; } - ao2_ref(route, +1); - return route; + *route_out = *route; + return 0; } static void router_dispatch(void *data, @@ -143,15 +188,12 @@ static void router_dispatch(void *data, struct stasis_message *message) { struct stasis_message_router *router = data; - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + struct stasis_message_route route; - route = find_route(router, message); - - if (route) { - route->callback(route->data, sub, topic, message); + if (find_route(router, message, &route) == 0) { + route.callback(route.data, sub, topic, message); } - if (stasis_subscription_final_message(sub, message)) { ao2_cleanup(router); } @@ -167,14 +209,12 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash, - route_cmp); + router->routes = ast_calloc(1, sizeof(*router->routes)); if (!router->routes) { return NULL; } - router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, - route_hash, route_cmp); + router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes)); if (!router->cache_routes) { return NULL; } @@ -216,100 +256,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router) return stasis_subscription_is_done(router->subscription); } - -static struct stasis_message_route *route_create( - struct stasis_message_type *message_type, - stasis_subscription_cb callback, - void *data) -{ - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = ao2_alloc(sizeof(*route), route_dtor); - if (!route) { - return NULL; - } - - if (message_type) { - ao2_ref(message_type, +1); - } - route->message_type = message_type; - route->callback = callback; - route->data = data; - - ao2_ref(route, +1); - return route; -} - -static int add_route(struct stasis_message_router *router, - struct stasis_message_route *route) -{ - RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, router); - - existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY); - - if (existing_route) { - ast_log(LOG_ERROR, "Cannot add route; route exists\n"); - return -1; - } - - ao2_link(router->routes, route); - return 0; -} - -static int add_cache_route(struct stasis_message_router *router, - struct stasis_message_route *route) -{ - RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, router); - - existing_route = ao2_find(router->cache_routes, route->message_type, - OBJ_KEY); - - if (existing_route) { - ast_log(LOG_ERROR, "Cannot add route; route exists\n"); - return -1; - } - - ao2_link(router->cache_routes, route); - return 0; -} - int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = route_create(message_type, callback, data); - if (!route) { - return -1; - } - - return add_route(router, route); + SCOPED_AO2LOCK(lock, router); + return table_add_route(&router->routes, message_type, callback, data); } int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = route_create(message_type, callback, data); - if (!route) { - return -1; - } - - return add_cache_route(router, route); + SCOPED_AO2LOCK(lock, router); + return table_add_route(&router->cache_routes, message_type, callback, data); } void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type) { SCOPED_AO2LOCK(lock, router); - - ao2_find(router->routes, message_type, - OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); + table_remove_route(router->routes, message_type); } void stasis_message_router_remove_cache_update( @@ -317,9 +284,7 @@ void stasis_message_router_remove_cache_update( struct stasis_message_type *message_type) { SCOPED_AO2LOCK(lock, router); - - ao2_find(router->cache_routes, message_type, - OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); + table_remove_route(router->cache_routes, message_type); } int stasis_message_router_set_default(struct stasis_message_router *router, @@ -327,7 +292,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router, void *data) { SCOPED_AO2LOCK(lock, router); - ao2_cleanup(router->default_route); - router->default_route = route_create(NULL, callback, data); - return router->default_route ? 0 : -1; + router->default_route.callback = callback; + router->default_route.data = data; + /* While this implementation can never fail, it used to be able to */ + return 0; } diff --git a/main/taskprocessor.c b/main/taskprocessor.c index a8d1c80f97..92bd64c328 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -431,10 +431,6 @@ static void tps_taskprocessor_destroy(void *tps) } ast_free((char *) t->name); if (t->listener) { - /* This code should not be reached since the listener - * should have been destroyed before the taskprocessor could - * be destroyed - */ ao2_ref(t->listener, -1); t->listener = NULL; } diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index 2b7a82b3fe..c3e6c21924 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -9,6 +9,7 @@ #define RES_PJSIP_PRIVATE_H_ struct ao2_container; +struct ast_threadpool_options; /*! * \brief Initialize the configuration for res_pjsip diff --git a/tests/test_stasis.c b/tests/test_stasis.c index da9c50874e..498df94402 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -867,52 +867,6 @@ AST_TEST_DEFINE(cache_dump) return AST_TEST_PASS; } -AST_TEST_DEFINE(route_conflicts) -{ - RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); - RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); - RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); - RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); - int ret; - - switch (cmd) { - case TEST_INIT: - info->name = __func__; - info->category = test_category; - info->summary = - "Multiple routes to the same message_type should fail"; - info->description = - "Multiple routes to the same message_type should fail"; - return AST_TEST_NOT_RUN; - case TEST_EXECUTE: - break; - } - - topic = stasis_topic_create("TestTopic"); - ast_test_validate(test, NULL != topic); - - consumer1 = consumer_create(1); - ast_test_validate(test, NULL != consumer1); - consumer2 = consumer_create(1); - ast_test_validate(test, NULL != consumer2); - - test_message_type = stasis_message_type_create("TestMessage", NULL); - ast_test_validate(test, NULL != test_message_type); - - uut = stasis_message_router_create(topic); - ast_test_validate(test, NULL != uut); - - ret = stasis_message_router_add( - uut, test_message_type, consumer_exec, consumer1); - ast_test_validate(test, 0 == ret); - ret = stasis_message_router_add( - uut, test_message_type, consumer_exec, consumer2); - ast_test_validate(test, 0 != ret); - - return AST_TEST_PASS; -} - AST_TEST_DEFINE(router) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -1373,7 +1327,6 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_filter); AST_TEST_UNREGISTER(cache); AST_TEST_UNREGISTER(cache_dump); - AST_TEST_UNREGISTER(route_conflicts); AST_TEST_UNREGISTER(router); AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); @@ -1397,7 +1350,6 @@ static int load_module(void) AST_TEST_REGISTER(cache_filter); AST_TEST_REGISTER(cache); AST_TEST_REGISTER(cache_dump); - AST_TEST_REGISTER(route_conflicts); AST_TEST_REGISTER(router); AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving);