Remove dispatch object allocation from Stasis publishing

While looking for areas for performance improvement, I realized that an
unused feature in Stasis was negatively impacting performance.

When a message is sent to a subscriber, a dispatch object is allocated
for the dispatch, containing the topic the message was published to, the
subscriber the message is being sent to, and the message itself.

The topic is actually unused by any subscriber in Asterisk today. And
the subscriber is associated with the taskprocessor the message is being
dispatched to.

First, this patch removes the unused topic parameter from Stasis
subscription callbacks.

Second, this patch introduces the concept of taskprocessor local data,
data that may be set on a taskprocessor and provided along with the data
pointer when a task is pushed using the ast_taskprocessor_push_local()
call. This allows the task to have both data specific to that
taskprocessor, in addition to data specific to that invocation.

With those two changes, the dispatch object can be removed completely,
and the message is simply refcounted and sent directly to the
taskprocessor.

Review: https://reviewboard.asterisk.org/r/2884/


git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400181 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
David M. Lee
2013-09-30 18:48:57 +00:00
parent 9d21631aee
commit 516dbe86a0
46 changed files with 376 additions and 282 deletions

View File

@@ -309,7 +309,7 @@ static struct consumer *consumer_create(void) {
return consumer;
}
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@@ -342,7 +342,7 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
}
}
static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;

View File

@@ -183,7 +183,7 @@ static struct consumer *consumer_create(int ignore_subscriptions) {
return consumer;
}
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@@ -711,7 +711,6 @@ AST_TEST_DEFINE(cache)
/* Check for new snapshot messages */
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
actual_update = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
@@ -720,7 +719,6 @@ AST_TEST_DEFINE(cache)
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
actual_update = stasis_message_data(consumer->messages_rxed[1]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
@@ -736,7 +734,6 @@ AST_TEST_DEFINE(cache)
ast_test_validate(test, 3 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[2]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
@@ -752,7 +749,6 @@ AST_TEST_DEFINE(cache)
ast_test_validate(test, 4 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[3]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
ast_test_validate(test, NULL == actual_update->new_snapshot);
ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
@@ -1226,7 +1222,7 @@ AST_TEST_DEFINE(to_ami)
}
static void noop(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
/* no-op */
}

View File

@@ -48,6 +48,31 @@ struct task_data {
int task_complete;
};
static void task_data_dtor(void *obj)
{
struct task_data *task_data = obj;
ast_mutex_destroy(&task_data->lock);
ast_cond_destroy(&task_data->cond);
}
/*! \brief Create a task_data object */
static struct task_data *task_data_create(void)
{
struct task_data *task_data =
ao2_alloc(sizeof(*task_data), task_data_dtor);
if (!task_data) {
return NULL;
}
ast_cond_init(&task_data->cond, NULL);
ast_mutex_init(&task_data->lock);
task_data->task_complete = 0;
return task_data;
}
/*!
* \brief Queued task for baseline test.
*
@@ -64,6 +89,30 @@ static int task(void *data)
return 0;
}
/*!
* \brief Wait for a task to execute.
*/
static int task_wait(struct task_data *task_data)
{
struct timeval start = ast_tvnow();
struct timespec end;
SCOPED_MUTEX(lock, &task_data->lock);
end.tv_sec = start.tv_sec + 30;
end.tv_nsec = start.tv_usec * 1000;
while (!task_data->task_complete) {
int res;
res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
&end);
if (res == ETIMEDOUT) {
return -1;
}
}
return 0;
}
/*!
* \brief Baseline test for default taskprocessor
*
@@ -73,12 +122,9 @@ static int task(void *data)
*/
AST_TEST_DEFINE(default_taskprocessor)
{
struct ast_taskprocessor *tps;
struct task_data task_data;
struct timeval start;
struct timespec ts;
enum ast_test_result_state res = AST_TEST_PASS;
int timedwait_res;
RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
int res;
switch (cmd) {
case TEST_INIT:
@@ -99,36 +145,21 @@ AST_TEST_DEFINE(default_taskprocessor)
return AST_TEST_FAIL;
}
start = ast_tvnow();
ts.tv_sec = start.tv_sec + 30;
ts.tv_nsec = start.tv_usec * 1000;
ast_cond_init(&task_data.cond, NULL);
ast_mutex_init(&task_data.lock);
task_data.task_complete = 0;
ast_taskprocessor_push(tps, task, &task_data);
ast_mutex_lock(&task_data.lock);
while (!task_data.task_complete) {
timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
if (timedwait_res == ETIMEDOUT) {
break;
}
task_data = task_data_create();
if (!task_data) {
ast_test_status_update(test, "Unable to create task_data\n");
return AST_TEST_FAIL;
}
ast_mutex_unlock(&task_data.lock);
if (!task_data.task_complete) {
ast_taskprocessor_push(tps, task, task_data);
res = task_wait(task_data);
if (res != 0) {
ast_test_status_update(test, "Queued task did not execute!\n");
res = AST_TEST_FAIL;
goto test_end;
return AST_TEST_FAIL;
}
test_end:
tps = ast_taskprocessor_unreference(tps);
ast_mutex_destroy(&task_data.lock);
ast_cond_destroy(&task_data.cond);
return res;
return AST_TEST_PASS;
}
#define NUM_TASKS 20000
@@ -631,12 +662,78 @@ AST_TEST_DEFINE(taskprocessor_shutdown)
return AST_TEST_PASS;
}
static int local_task_exe(struct ast_taskprocessor_local *local)
{
int *local_data = local->local_data;
struct task_data *task_data = local->data;
*local_data = 1;
task(task_data);
return 0;
}
AST_TEST_DEFINE(taskprocessor_push_local)
{
RAII_VAR(struct ast_taskprocessor *, tps, NULL,
ast_taskprocessor_unreference);
struct task_data *task_data;
int local_data;
int res;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = "/main/taskprocessor/";
info->summary = "Test of pushing local data";
info->description =
"Ensures that local data is passed along.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
if (!tps) {
ast_test_status_update(test, "Unable to create test taskprocessor\n");
return AST_TEST_FAIL;
}
task_data = task_data_create();
if (!task_data) {
ast_test_status_update(test, "Unable to create task_data\n");
return AST_TEST_FAIL;
}
local_data = 0;
ast_taskprocessor_set_local(tps, &local_data);
ast_taskprocessor_push_local(tps, local_task_exe, task_data);
res = task_wait(task_data);
if (res != 0) {
ast_test_status_update(test, "Queued task did not execute!\n");
return AST_TEST_FAIL;
}
if (local_data != 1) {
ast_test_status_update(test,
"Queued task did not set local_data!\n");
return AST_TEST_FAIL;
}
return AST_TEST_PASS;
}
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
ast_test_unregister(taskprocessor_listener);
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
return 0;
}
@@ -646,6 +743,7 @@ static int load_module(void)
ast_test_register(default_taskprocessor_load);
ast_test_register(taskprocessor_listener);
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);
return AST_MODULE_LOAD_SUCCESS;
}