Stasis performance improvements

This patch addresses several performance problems that were found in
the initial performance testing of Asterisk 12.

The Stasis dispatch object was allocated as an AO2 object, even though
it has a very confined lifecycle. This was replaced with a straight
ast_malloc().

The Stasis message router was spending an inordinate amount of time
searching hash tables. In this case, most of our routers had 6 or
fewer routes in them to begin with. This was replaced with an array
that's searched linearly for the route.

We more heavily rely on AO2 objects in Asterisk 12, and the memset()
in ao2_ref() actually became noticeable on the profile. This was
#ifdef'ed to only run when AO2_DEBUG was enabled.

After being misled by an erroneous comment in taskprocessor.c during
profiling, the wrong comment was removed.

Review: https://reviewboard.asterisk.org/r/2873/


git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400138 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
David M. Lee
2013-09-30 15:24:00 +00:00
parent 2438636a31
commit 48d90d947d
7 changed files with 132 additions and 224 deletions

View File

@@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router);
* updates for types not handled by routes added with
* stasis_message_router_add_cache_update().
*
* Adding multiple routes for the same message type results in undefined
* behavior.
*
* \param router Router to add the route to.
* \param message_type Type of message to route.
* \param callback Callback to forard messages of \a message_type to.
@@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router,
* These are distinct from regular routes, so one could have both a regular
* route and a cache route for the same \a message_type.
*
* Adding multiple routes for the same message type results in undefined
* behavior.
*
* \param router Router to add the route to.
* \param message_type Subtype of cache update to route.
* \param callback Callback to forard messages of \a message_type to.
@@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
/*!
* \brief Remove a route from a message router.
*
* If a route is removed from another thread, there is no notification that
* all messages using this route have been processed. This typically means that
* the associated \c data pointer for this route must be kept until the
* route itself is disposed of.
*
* \param router Router to remove the route from.
* \param message_type Type of message to route.
*
@@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router,
/*!
* \brief Remove a cache route from a message router.
*
* If a route is removed from another thread, there is no notification that
* all messages using this route have been processed. This typically means that
* the associated \c data pointer for this route must be kept until the
* route itself is disposed of.
*
* \param router Router to remove the route from.
* \param message_type Type of message to route.
*

View File

@@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
ast_atomic_fetchadd_int(&ao2.total_objects, -1);
#endif
/* In case someone uses an object after it's been freed */
obj->priv_data.magic = 0;
switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
obj_mutex = INTERNAL_OBJ_MUTEX(user_data);
ast_mutex_destroy(&obj_mutex->mutex.lock);
/*
* For safety, zero-out the astobj2_lock header and also the
* first word of the user-data, which we make sure is always
* allocated.
*/
memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) );
ast_free(obj_mutex);
break;
case AO2_ALLOC_OPT_LOCK_RWLOCK:
obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data);
ast_rwlock_destroy(&obj_rwlock->rwlock.lock);
/*
* For safety, zero-out the astobj2_rwlock header and also the
* first word of the user-data, which we make sure is always
* allocated.
*/
memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) );
ast_free(obj_rwlock);
break;
case AO2_ALLOC_OPT_LOCK_NOLOCK:
/*
* For safety, zero-out the astobj2 header and also the first
* word of the user-data, which we make sure is always
* allocated.
*/
memset(obj, '\0', sizeof(*obj) + sizeof(void *) );
ast_free(obj);
break;
default:
@@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f
struct astobj2_lock *obj_mutex;
struct astobj2_rwlock *obj_rwlock;
if (data_size < sizeof(void *)) {
/*
* We always alloc at least the size of a void *,
* for debugging purposes.
*/
data_size = sizeof(void *);
}
switch (options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
#if defined(__AST_DEBUG_MALLOC)

View File

@@ -467,23 +467,24 @@ struct dispatch {
struct stasis_subscription *sub;
};
static void dispatch_dtor(void *data)
static void dispatch_dtor(struct dispatch *dispatch)
{
struct dispatch *dispatch = data;
ao2_cleanup(dispatch->topic);
ao2_cleanup(dispatch->message);
ao2_cleanup(dispatch->sub);
ast_free(dispatch);
}
static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
{
RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
struct dispatch *dispatch;
ast_assert(topic != NULL);
ast_assert(message != NULL);
ast_assert(sub != NULL);
dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
dispatch = ast_malloc(sizeof(*dispatch));
if (!dispatch) {
return NULL;
}
@@ -497,7 +498,6 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
dispatch->sub = sub;
ao2_ref(sub, +1);
ao2_ref(dispatch, +1);
return dispatch;
}
@@ -508,9 +508,10 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
*/
static int dispatch_exec(void *data)
{
RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
struct dispatch *dispatch = data;
subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
dispatch_dtor(dispatch);
return 0;
}
@@ -534,20 +535,19 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu
ast_assert(sub != NULL);
if (sub->mailbox) {
RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
struct dispatch *dispatch;
dispatch = dispatch_create(publisher_topic, message, sub);
if (!dispatch) {
ast_log(LOG_DEBUG, "Dropping dispatch\n");
ast_log(LOG_ERROR, "Dropping dispatch\n");
break;
}
if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
/* Ownership transferred to mailbox.
* Don't increment ref, b/c the task processor
* may have already gotten rid of the object.
if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
/* Push failed; just delete the dispatch.
*/
dispatch = NULL;
ast_log(LOG_ERROR, "Dropping dispatch\n");
dispatch_dtor(dispatch);
}
} else {
/* Dispatch directly */

View File

@@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
/*! Number of hash buckets for the route table. Keep it prime! */
#define ROUTE_TABLE_BUCKETS 7
/*! \internal */
struct stasis_message_route {
/*! Message type handle by this route. */
@@ -47,29 +44,79 @@ struct stasis_message_route {
void *data;
};
static void route_dtor(void *obj)
{
struct stasis_message_route *route = obj;
struct route_table {
/*! Current number of entries in the route table */
size_t current_size;
/*! Allocated number of entires in the route table */
size_t max_size;
/*! The route table itself */
struct stasis_message_route routes[];
};
ao2_cleanup(route->message_type);
route->message_type = NULL;
static struct stasis_message_route *table_find_route(struct route_table *table,
struct stasis_message_type *message_type)
{
size_t idx;
/* While a linear search for routes may seem very inefficient, most
* route tables have six routes or less. For such small data, it's
* hard to beat a linear search. If we start having larger route
* tables, then we can look into containers with more efficient
* lookups.
*/
for (idx = 0; idx < table->current_size; ++idx) {
if (table->routes[idx].message_type == message_type) {
return &table->routes[idx];
}
}
return NULL;
}
static int route_hash(const void *obj, const int flags)
static int table_add_route(struct route_table **table_ptr,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
const struct stasis_message_route *route = obj;
const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type;
struct route_table *table = *table_ptr;
struct stasis_message_route *route;
return ast_str_hash(stasis_message_type_name(message_type));
ast_assert(table_find_route(table, message_type) == NULL);
if (table->current_size + 1 > table->max_size) {
size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
struct route_table *new_table = ast_realloc(table,
sizeof(*new_table) +
sizeof(new_table->routes[0]) * new_max_size);
if (!new_table) {
return -1;
}
*table_ptr = table = new_table;
table->max_size = new_max_size;
}
route = &table->routes[table->current_size++];
route->message_type = ao2_bump(message_type);
route->callback = callback;
route->data = data;
return 0;
}
static int route_cmp(void *obj, void *arg, int flags)
static int table_remove_route(struct route_table *table,
struct stasis_message_type *message_type)
{
const struct stasis_message_route *left = obj;
const struct stasis_message_route *right = arg;
const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type;
size_t idx;
return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0;
for (idx = 0; idx < table->current_size; ++idx) {
if (table->routes[idx].message_type == message_type) {
ao2_cleanup(message_type);
table->routes[idx] =
table->routes[--table->current_size];
return 0;
}
}
return -1;
}
/*! \internal */
@@ -77,11 +124,11 @@ struct stasis_message_router {
/*! Subscription to the upstream topic */
struct stasis_subscription *subscription;
/*! Subscribed routes */
struct ao2_container *routes;
/*! Subscribed routes for \ref stasi_cache_update messages */
struct ao2_container *cache_routes;
struct route_table *routes;
/*! Subscribed routes for \ref stasis_cache_update messages */
struct route_table *cache_routes;
/*! Route of last resort */
struct stasis_message_route *default_route;
struct stasis_message_route default_route;
};
static void router_dtor(void *obj)
@@ -92,49 +139,47 @@ static void router_dtor(void *obj)
ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
ao2_cleanup(router->routes);
ast_free(router->routes);
router->routes = NULL;
ao2_cleanup(router->cache_routes);
ast_free(router->cache_routes);
router->cache_routes = NULL;
ao2_cleanup(router->default_route);
router->default_route = NULL;
}
static struct stasis_message_route *find_route(
static int find_route(
struct stasis_message_router *router,
struct stasis_message *message)
struct stasis_message *message,
struct stasis_message_route *route_out)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
struct stasis_message_route *route = NULL;
struct stasis_message_type *type = stasis_message_type(message);
SCOPED_AO2LOCK(lock, router);
ast_assert(route_out != NULL);
if (type == stasis_cache_update_type()) {
/* Find a cache route */
struct stasis_cache_update *update =
stasis_message_data(message);
route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
route = table_find_route(router->cache_routes, update->type);
}
if (route == NULL) {
/* Find a regular route */
route = ao2_find(router->routes, type, OBJ_KEY);
route = table_find_route(router->routes, type);
}
if (route == NULL) {
if (route == NULL && router->default_route.callback) {
/* Maybe the default route, then? */
if ((route = router->default_route)) {
ao2_ref(route, +1);
}
route = &router->default_route;
}
if (route == NULL) {
return NULL;
if (!route) {
return -1;
}
ao2_ref(route, +1);
return route;
*route_out = *route;
return 0;
}
static void router_dispatch(void *data,
@@ -143,15 +188,12 @@ static void router_dispatch(void *data,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
struct stasis_message_route route;
route = find_route(router, message);
if (route) {
route->callback(route->data, sub, topic, message);
if (find_route(router, message, &route) == 0) {
route.callback(route.data, sub, topic, message);
}
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(router);
}
@@ -167,14 +209,12 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
route_cmp);
router->routes = ast_calloc(1, sizeof(*router->routes));
if (!router->routes) {
return NULL;
}
router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
route_hash, route_cmp);
router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
if (!router->cache_routes) {
return NULL;
}
@@ -216,100 +256,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
return stasis_subscription_is_done(router->subscription);
}
static struct stasis_message_route *route_create(
struct stasis_message_type *message_type,
stasis_subscription_cb callback,
void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
route = ao2_alloc(sizeof(*route), route_dtor);
if (!route) {
return NULL;
}
if (message_type) {
ao2_ref(message_type, +1);
}
route->message_type = message_type;
route->callback = callback;
route->data = data;
ao2_ref(route, +1);
return route;
}
static int add_route(struct stasis_message_router *router,
struct stasis_message_route *route)
{
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
if (existing_route) {
ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
ao2_link(router->routes, route);
return 0;
}
static int add_cache_route(struct stasis_message_router *router,
struct stasis_message_route *route)
{
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
existing_route = ao2_find(router->cache_routes, route->message_type,
OBJ_KEY);
if (existing_route) {
ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
ao2_link(router->cache_routes, route);
return 0;
}
int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
route = route_create(message_type, callback, data);
if (!route) {
return -1;
}
return add_route(router, route);
SCOPED_AO2LOCK(lock, router);
return table_add_route(&router->routes, message_type, callback, data);
}
int stasis_message_router_add_cache_update(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
route = route_create(message_type, callback, data);
if (!route) {
return -1;
}
return add_cache_route(router, route);
SCOPED_AO2LOCK(lock, router);
return table_add_route(&router->cache_routes, message_type, callback, data);
}
void stasis_message_router_remove(struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
ao2_find(router->routes, message_type,
OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
table_remove_route(router->routes, message_type);
}
void stasis_message_router_remove_cache_update(
@@ -317,9 +284,7 @@ void stasis_message_router_remove_cache_update(
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
ao2_find(router->cache_routes, message_type,
OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
table_remove_route(router->cache_routes, message_type);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
@@ -327,7 +292,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
void *data)
{
SCOPED_AO2LOCK(lock, router);
ao2_cleanup(router->default_route);
router->default_route = route_create(NULL, callback, data);
return router->default_route ? 0 : -1;
router->default_route.callback = callback;
router->default_route.data = data;
/* While this implementation can never fail, it used to be able to */
return 0;
}

View File

@@ -431,10 +431,6 @@ static void tps_taskprocessor_destroy(void *tps)
}
ast_free((char *) t->name);
if (t->listener) {
/* This code should not be reached since the listener
* should have been destroyed before the taskprocessor could
* be destroyed
*/
ao2_ref(t->listener, -1);
t->listener = NULL;
}

View File

@@ -9,6 +9,7 @@
#define RES_PJSIP_PRIVATE_H_
struct ao2_container;
struct ast_threadpool_options;
/*!
* \brief Initialize the configuration for res_pjsip

View File

@@ -867,52 +867,6 @@ AST_TEST_DEFINE(cache_dump)
return AST_TEST_PASS;
}
AST_TEST_DEFINE(route_conflicts)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
int ret;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary =
"Multiple routes to the same message_type should fail";
info->description =
"Multiple routes to the same message_type should fail";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
test_message_type = stasis_message_type_create("TestMessage", NULL);
ast_test_validate(test, NULL != test_message_type);
uut = stasis_message_router_create(topic);
ast_test_validate(test, NULL != uut);
ret = stasis_message_router_add(
uut, test_message_type, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ret = stasis_message_router_add(
uut, test_message_type, consumer_exec, consumer2);
ast_test_validate(test, 0 != ret);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -1373,7 +1327,6 @@ static int unload_module(void)
AST_TEST_UNREGISTER(cache_filter);
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
@@ -1397,7 +1350,6 @@ static int load_module(void)
AST_TEST_REGISTER(cache_filter);
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);