vector: Update API to be more flexible.

Made the vector macro API be more like linked lists.
1) Added a name parameter to ast_vector() to name the vector struct.
2) Made the API take a pointer to the vector struct instead of the struct
itself.
3) Added an element cleanup macro/function parameter when removing an
element from the vector for ast_vector_remove_cmp_unordered() and
ast_vector_remove_elem_unordered().
4) Added ast_vector_get_addr() in case the vector element is not a simple
pointer.

* Converted an inline vector usage in stasis_message_router to use the
vector API.  It needed the API improvements so it could be converted.

* Fixed topic reference leak in router_dtor() when the
stasis_message_router is destroyed.

* Fixed deadlock potential in stasis_forward_all() and
stasis_forward_cancel().  Locking two topics at the same time requires
deadlock avoidance.

* Made internal_stasis_subscribe() tolerant of a NULL topic.

* Made stasis_message_router_add(),
stasis_message_router_add_cache_update(), stasis_message_router_remove(),
and stasis_message_router_remove_cache_update() tolerant of a NULL
message_type.

* Promoted a LOG_DEBUG message to LOG_ERROR as intended in
dispatch_message().

Review: https://reviewboard.asterisk.org/r/2903/


git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@402429 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Richard Mudgett
2013-11-02 04:05:24 +00:00
parent 55da2b319d
commit 0bc642bc38
4 changed files with 304 additions and 177 deletions

View File

@@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/vector.h"
/*! \internal */
struct stasis_message_route {
@@ -44,19 +45,13 @@ struct stasis_message_route {
void *data;
};
struct route_table {
/*! Current number of entries in the route table */
size_t current_size;
/*! Allocated number of entires in the route table */
size_t max_size;
/*! The route table itself */
struct stasis_message_route routes[];
};
ast_vector(route_table, struct stasis_message_route);
static struct stasis_message_route *table_find_route(struct route_table *table,
static struct stasis_message_route *route_table_find(struct route_table *table,
struct stasis_message_type *message_type)
{
size_t idx;
struct stasis_message_route *route;
/* While a linear search for routes may seem very inefficient, most
* route tables have six routes or less. For such small data, it's
@@ -64,59 +59,74 @@ static struct stasis_message_route *table_find_route(struct route_table *table,
* tables, then we can look into containers with more efficient
* lookups.
*/
for (idx = 0; idx < table->current_size; ++idx) {
if (table->routes[idx].message_type == message_type) {
return &table->routes[idx];
for (idx = 0; idx < ast_vector_size(table); ++idx) {
route = ast_vector_get_addr(table, idx);
if (route->message_type == message_type) {
return route;
}
}
return NULL;
}
static int table_add_route(struct route_table **table_ptr,
/*!
* \brief route_table comparator for ast_vector_remove_cmp_unordered()
*
* \param elem Element to compare against
* \param value Value to compare with the vector element.
*
* \return 0 if element does not match.
* \return Non-zero if element matches.
*/
#define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
/*!
* \brief route_table vector element cleanup.
*
* \param elem Element to cleanup
*
* \return Nothing
*/
#define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
static int route_table_remove(struct route_table *table,
struct stasis_message_type *message_type)
{
return ast_vector_remove_cmp_unordered(table, message_type, ROUTE_TABLE_ELEM_CMP,
ROUTE_TABLE_ELEM_CLEANUP);
}
static int route_table_add(struct route_table *table,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
struct route_table *table = *table_ptr;
struct stasis_message_route *route;
struct stasis_message_route route;
int res;
ast_assert(table_find_route(table, message_type) == NULL);
ast_assert(callback != NULL);
ast_assert(route_table_find(table, message_type) == NULL);
if (table->current_size + 1 > table->max_size) {
size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
struct route_table *new_table = ast_realloc(table,
sizeof(*new_table) +
sizeof(new_table->routes[0]) * new_max_size);
if (!new_table) {
return -1;
}
*table_ptr = table = new_table;
table->max_size = new_max_size;
route.message_type = ao2_bump(message_type);
route.callback = callback;
route.data = data;
res = ast_vector_append(table, route);
if (res) {
ROUTE_TABLE_ELEM_CLEANUP(route);
}
route = &table->routes[table->current_size++];
route->message_type = ao2_bump(message_type);
route->callback = callback;
route->data = data;
return 0;
return res;
}
static int table_remove_route(struct route_table *table,
struct stasis_message_type *message_type)
static void route_table_dtor(struct route_table *table)
{
size_t idx;
struct stasis_message_route *route;
for (idx = 0; idx < table->current_size; ++idx) {
if (table->routes[idx].message_type == message_type) {
ao2_cleanup(message_type);
table->routes[idx] =
table->routes[--table->current_size];
return 0;
}
for (idx = 0; idx < ast_vector_size(table); ++idx) {
route = ast_vector_get_addr(table, idx);
ROUTE_TABLE_ELEM_CLEANUP(*route);
}
return -1;
ast_vector_free(table);
}
/*! \internal */
@@ -124,9 +134,9 @@ struct stasis_message_router {
/*! Subscription to the upstream topic */
struct stasis_subscription *subscription;
/*! Subscribed routes */
struct route_table *routes;
struct route_table routes;
/*! Subscribed routes for \ref stasis_cache_update messages */
struct route_table *cache_routes;
struct route_table cache_routes;
/*! Route of last resort */
struct stasis_message_route default_route;
};
@@ -137,13 +147,11 @@ static void router_dtor(void *obj)
ast_assert(!stasis_subscription_is_subscribed(router->subscription));
ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
ast_free(router->routes);
router->routes = NULL;
ast_free(router->cache_routes);
router->cache_routes = NULL;
route_table_dtor(&router->routes);
route_table_dtor(&router->cache_routes);
}
static int find_route(
@@ -161,12 +169,12 @@ static int find_route(
/* Find a cache route */
struct stasis_cache_update *update =
stasis_message_data(message);
route = table_find_route(router->cache_routes, update->type);
route = route_table_find(&router->cache_routes, update->type);
}
if (route == NULL) {
/* Find a regular route */
route = table_find_route(router->routes, type);
route = route_table_find(&router->routes, type);
}
if (route == NULL && router->default_route.callback) {
@@ -201,6 +209,7 @@ static void router_dispatch(void *data,
struct stasis_message_router *stasis_message_router_create(
struct stasis_topic *topic)
{
int res;
RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
router = ao2_alloc(sizeof(*router), router_dtor);
@@ -208,13 +217,10 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
router->routes = ast_calloc(1, sizeof(*router->routes));
if (!router->routes) {
return NULL;
}
router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
if (!router->cache_routes) {
res = 0;
res |= ast_vector_init(&router->routes, 0);
res |= ast_vector_init(&router->cache_routes, 0);
if (res) {
return NULL;
}
@@ -259,40 +265,78 @@ int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
SCOPED_AO2LOCK(lock, router);
return table_add_route(&router->routes, message_type, callback, data);
int res;
ast_assert(router != NULL);
if (!message_type) {
/* Cannot route to NULL type. */
return -1;
}
ao2_lock(router);
res = route_table_add(&router->routes, message_type, callback, data);
ao2_unlock(router);
return res;
}
int stasis_message_router_add_cache_update(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
SCOPED_AO2LOCK(lock, router);
return table_add_route(&router->cache_routes, message_type, callback, data);
int res;
ast_assert(router != NULL);
if (!message_type) {
/* Cannot cache a route to NULL type. */
return -1;
}
ao2_lock(router);
res = route_table_add(&router->cache_routes, message_type, callback, data);
ao2_unlock(router);
return res;
}
void stasis_message_router_remove(struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
table_remove_route(router->routes, message_type);
ast_assert(router != NULL);
if (!message_type) {
/* Cannot remove a NULL type. */
return;
}
ao2_lock(router);
route_table_remove(&router->routes, message_type);
ao2_unlock(router);
}
void stasis_message_router_remove_cache_update(
struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
table_remove_route(router->cache_routes, message_type);
ast_assert(router != NULL);
if (!message_type) {
/* Cannot remove a NULL type. */
return;
}
ao2_lock(router);
route_table_remove(&router->cache_routes, message_type);
ao2_unlock(router);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data)
stasis_subscription_cb callback,
void *data)
{
SCOPED_AO2LOCK(lock, router);
ast_assert(router != NULL);
ast_assert(callback != NULL);
ao2_lock(router);
router->default_route.callback = callback;
router->default_route.data = data;
ao2_unlock(router);
/* While this implementation can never fail, it used to be able to */
return 0;
}