Files
asterisk/main/taskpool.c
Joshua C. Colp 1dc6fecc21 taskpool: Add taskpool API, switch Stasis to using it.
This change introduces a new API called taskpool. This is a pool
of taskprocessors. It provides the following functionality:

1. Task pushing to a pool of taskprocessors
2. Synchronous tasks
3. Serializers for execution ordering of tasks
4. Growing/shrinking of number of taskprocessors in pool

This functionality already exists through the combination of
threadpool+taskprocessors but through investigating I determined
that this carries substantial overhead for short to medium duration
tasks. The threadpool uses a single queue of work, and for management
of threads it involves additional tasks.

I wrote taskpool to eliminate the extra overhead and management
as much as possible. Instead of a single queue of work each
taskprocessor has its own queue and at push time a selector chooses
the taskprocessor to queue the task to. Each taskprocessor also
has its own thread like normal. This spreads out the tasks immediately
and reduces contention on shared resources.

Using the included efficiency tests the number of tasks that can be
executed per second in a taskpool is 6-12 times more than an equivalent
threadpool+taskprocessor setup.

Stasis has been moved over to using this new API as it is a heavy consumer
of threadpool+taskprocessors and produces a lot of tasks.

UpgradeNote: The threadpool_* options in stasis.conf have now been deprecated
though they continue to be read and used. They have been replaced with taskpool
options that give greater control over the underlying taskpool used for stasis.

DeveloperNote: The taskpool API has been added for common usage of a
pool of taskprocessors. It is suggested to use this API instead of the
threadpool+taskprocessor approach.
2025-09-16 17:21:23 +00:00

946 lines
29 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2025, Sangoma Technologies Corporation
*
* Joshua Colp <jcolp@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/taskpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer_shutdown_group.h"
#include "asterisk/utils.h"
#include "asterisk/time.h"
#include "asterisk/sched.h"
/*!
* \brief A taskpool taskprocessor
*/
struct taskpool_taskprocessor {
/*! The underlying taskprocessor */
struct ast_taskprocessor *taskprocessor;
/*! The last time a task was pushed to this taskprocessor */
struct timeval last_pushed;
};
/*!
* \brief A container of taskprocessors
*/
struct taskpool_taskprocessors {
/*! A vector of taskprocessors */
AST_VECTOR(, struct taskpool_taskprocessor *) taskprocessors;
/*! The next taskprocessor to use for pushing */
unsigned int taskprocessor_num;
};
typedef void (*taskpool_selector)(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached);
/*!
* \brief An opaque taskpool structure
*
* A taskpool is a collection of taskprocessors that
* execute tasks, each from their own queue. A selector
* determines which taskprocessor to queue to at push
* time.
*/
struct ast_taskpool {
/*! The static taskprocessors, those which will always exist */
struct taskpool_taskprocessors static_taskprocessors;
/*! The dynamic taskprocessors, those which will be created as needed */
struct taskpool_taskprocessors dynamic_taskprocessors;
/*! True if the taskpool is in the process of shutting down */
int shutting_down;
/*! Taskpool-specific options */
struct ast_taskpool_options options;
/*! Dynamic pool shrinking scheduled item */
int shrink_sched_id;
/*! The taskprocessor selector to use */
taskpool_selector selector;
/*! The name of the taskpool */
char name[0];
};
/*! \brief The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold) */
#define TASKPOOL_GROW_THRESHOLD (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10
/*! \brief Scheduler used for dynamic pool shrinking */
static struct ast_sched_context *sched;
/*! \brief Thread storage for the current taskpool */
AST_THREADSTORAGE_RAW(current_taskpool_pool);
/*!
* \internal
* \brief Get the current taskpool associated with this thread.
*/
static struct ast_taskpool *ast_taskpool_get_current(void)
{
return ast_threadstorage_get_ptr(&current_taskpool_pool);
}
/*!
* \internal
* \brief Shutdown task for taskpool taskprocessor
*/
static int taskpool_taskprocessor_stop(void *data)
{
struct ast_taskpool *pool = ast_taskpool_get_current();
/* If a thread stop callback is set on the options, call it */
if (pool->options.thread_end) {
pool->options.thread_end();
}
ao2_cleanup(pool);
return 0;
}
/*! \internal */
static void taskpool_taskprocessor_dtor(void *obj)
{
struct taskpool_taskprocessor *taskprocessor = obj;
if (taskprocessor->taskprocessor && ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_stop, NULL)) {
/* We can't actually do anything if this fails, so just accept reality */
}
ast_taskprocessor_unreference(taskprocessor->taskprocessor);
}
/*!
* \internal
* \brief Startup task for taskpool taskprocessor
*/
static int taskpool_taskprocessor_start(void *data)
{
struct ast_taskpool *pool = data;
/* Set the pool on the thread for this taskprocessor, inheriting the
* reference passed to the task itself.
*/
ast_threadstorage_set_ptr(&current_taskpool_pool, pool);
/* If a thread start callback is set on the options, call it */
if (pool->options.thread_start) {
pool->options.thread_start();
}
return 0;
}
/*!
* \internal
* \brief Allocate a taskpool specific taskprocessor
*/
static struct taskpool_taskprocessor *taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
{
struct taskpool_taskprocessor *taskprocessor;
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
/* We don't actually need locking for each pool taskprocessor, as the only thing
* mutable is the underlying taskprocessor which has its own internal locking.
*/
taskprocessor = ao2_alloc_options(sizeof(*taskprocessor), taskpool_taskprocessor_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!taskprocessor) {
return NULL;
}
/* Create name with seq number appended. */
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "taskpool/%c:%s", type, pool->name);
taskprocessor->taskprocessor = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
if (!taskprocessor->taskprocessor) {
ao2_ref(taskprocessor, -1);
return NULL;
}
taskprocessor->last_pushed = ast_tvnow();
if (ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_start, ao2_bump(pool))) {
ao2_ref(pool, -1);
/* Prevent the taskprocessor from queueing the stop task by explicitly unreferencing and setting it to
* NULL here.
*/
ast_taskprocessor_unreference(taskprocessor->taskprocessor);
taskprocessor->taskprocessor = NULL;
return NULL;
}
return taskprocessor;
}
/*!
* \internal
* \brief Initialize the taskpool taskprocessors structure
*/
static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
{
if (AST_VECTOR_INIT(&taskprocessors->taskprocessors, size)) {
return -1;
}
return 0;
}
/*!
* \internal
* \brief Clean up the taskpool taskprocessors structure
*/
static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
{
/* Access/manipulation of taskprocessors is done with the lock held, and
* with a check of the shutdown flag done. This means that outside of holding
* the lock we can safely muck with it. Pushing to the taskprocessor is done
* outside of the lock, but with a reference to the taskprocessor held.
*/
AST_VECTOR_CALLBACK_VOID(&taskprocessors->taskprocessors, ao2_cleanup);
AST_VECTOR_FREE(&taskprocessors->taskprocessors);
}
/*!
* \internal
* \brief Determine if a taskpool taskprocessor is idle
*/
#define TASKPROCESSOR_IS_IDLE(tps, timeout) (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))
/*! \internal
* \brief Taskpool dynamic pool shrink function
*/
static int taskpool_dynamic_pool_shrink(const void *data)
{
struct ast_taskpool *pool = (struct ast_taskpool *)data;
int num_removed;
ao2_lock(pool);
/* If the pool is shutting down, do nothing and don't reschedule */
if (pool->shutting_down) {
ao2_unlock(pool);
ao2_ref(pool, -1);
return 0;
}
/* Go through the dynamic taskprocessors and find any which have been idle long enough and remove them */
num_removed = AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(&pool->dynamic_taskprocessors.taskprocessors, pool->options.idle_timeout * 1000,
TASKPROCESSOR_IS_IDLE, ao2_cleanup);
if (num_removed) {
/* If we've removed any taskprocessors the taskprocessor_num may no longer be valid, so update it */
if (pool->dynamic_taskprocessors.taskprocessor_num >= AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors)) {
pool->dynamic_taskprocessors.taskprocessor_num = 0;
}
}
ao2_unlock(pool);
/* It is possible for the pool to have been shut down between unlocking and returning, this is
* inherently a race condition we can't eliminate so we will catch it on the next iteration.
*/
return pool->options.idle_timeout * 1000;
}
/*!
* \internal
* \brief Sequential taskprocessor selector
*/
static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
{
unsigned int taskprocessor_num = taskprocessors->taskprocessor_num;
if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
*growth_threshold_reached = 1;
return;
}
taskprocessors->taskprocessor_num++;
if (taskprocessors->taskprocessor_num == AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
taskprocessors->taskprocessor_num = 0;
}
*taskprocessor = AST_VECTOR_GET(&taskprocessors->taskprocessors, taskprocessor_num);
/* Check to see if this has reached the growth threshold */
*growth_threshold_reached = (ast_taskprocessor_size((*taskprocessor)->taskprocessor) >= pool->options.growth_threshold) ? 1 : 0;
}
/*!
* \interal
* \brief Least full taskprocessor selector
*/
static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
{
struct taskpool_taskprocessor *least_full = NULL;
unsigned int i;
if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
*growth_threshold_reached = 1;
return;
}
/* We assume that the growth threshold has not yet been reached, until proven otherwise */
*growth_threshold_reached = 0;
for (i = 0; i < AST_VECTOR_SIZE(&taskprocessors->taskprocessors); i++) {
struct taskpool_taskprocessor *tp = AST_VECTOR_GET(&taskprocessors->taskprocessors, i);
/* If this taskprocessor has no outstanding tasks, it is the best choice */
if (!ast_taskprocessor_size(tp->taskprocessor)) {
*taskprocessor = tp;
return;
}
/* If any of the taskprocessors have reached the growth threshold then we should grow the pool */
if (ast_taskprocessor_size(tp->taskprocessor) >= pool->options.growth_threshold) {
*growth_threshold_reached = 1;
}
/* The taskprocessor with the fewest tasks should be used */
if (!least_full || ast_taskprocessor_size(tp->taskprocessor) < ast_taskprocessor_size(least_full->taskprocessor)) {
least_full = tp;
}
}
*taskprocessor = least_full;
}
struct ast_taskpool *ast_taskpool_create(const char *name,
const struct ast_taskpool_options *options)
{
struct ast_taskpool *pool;
/* Enforce versioning on the passed-in options */
if (options->version != AST_TASKPOOL_OPTIONS_VERSION) {
return NULL;
}
pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL);
if (!pool) {
return NULL;
}
strcpy(pool->name, name); /* Safe */
memcpy(&pool->options, options, sizeof(pool->options));
pool->shrink_sched_id = -1;
/* Verify the passed-in options are valid, and adjust if needed */
if (options->initial_size < options->minimum_size) {
pool->options.initial_size = options->minimum_size;
ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
name, options->initial_size, options->minimum_size, options->minimum_size);
}
if (options->max_size && pool->options.initial_size > options->max_size) {
pool->options.max_size = pool->options.initial_size;
ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
name, options->max_size, pool->options.initial_size, pool->options.initial_size);
}
if (!options->auto_increment) {
if (!pool->options.minimum_size) {
pool->options.minimum_size = 1;
ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name);
}
if (!pool->options.max_size) {
pool->options.max_size = pool->options.minimum_size;
ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size);
}
if (pool->options.minimum_size != pool->options.max_size) {
pool->options.minimum_size = pool->options.max_size;
pool->options.initial_size = pool->options.max_size;
ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
name, options->minimum_size, pool->options.max_size, pool->options.max_size);
}
} else if (!options->growth_threshold) {
pool->options.growth_threshold = TASKPOOL_GROW_THRESHOLD;
}
if (options->selector == AST_TASKPOOL_SELECTOR_DEFAULT || options->selector == AST_TASKPOOL_SELECTOR_LEAST_FULL) {
pool->selector = taskpool_least_full_selector;
} else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) {
pool->selector = taskpool_sequential_selector;
} else {
ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
name, options->selector);
pool->selector = taskpool_least_full_selector;
}
if (taskpool_taskprocessors_init(&pool->static_taskprocessors, pool->options.minimum_size)) {
ao2_ref(pool, -1);
return NULL;
}
/* Create the static taskprocessors based on the passed-in options */
for (int i = 0; i < pool->options.minimum_size; i++) {
struct taskpool_taskprocessor *taskprocessor;
taskprocessor = taskpool_taskprocessor_alloc(pool, 's');
if (!taskprocessor) {
/* The reference to pool is passed to ast_taskpool_shutdown */
ast_taskpool_shutdown(pool);
return NULL;
}
if (AST_VECTOR_APPEND(&pool->static_taskprocessors.taskprocessors, taskprocessor)) {
ao2_ref(taskprocessor, -1);
/* The reference to pool is passed to ast_taskpool_shutdown */
ast_taskpool_shutdown(pool);
return NULL;
}
}
if (taskpool_taskprocessors_init(&pool->dynamic_taskprocessors,
pool->options.initial_size - pool->options.minimum_size)) {
ast_taskpool_shutdown(pool);
return NULL;
}
/* Create the dynamic taskprocessor based on the passed-in options */
for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) {
struct taskpool_taskprocessor *taskprocessor;
taskprocessor = taskpool_taskprocessor_alloc(pool, 'd');
if (!taskprocessor) {
/* The reference to pool is passed to ast_taskpool_shutdown */
ast_taskpool_shutdown(pool);
return NULL;
}
if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, taskprocessor)) {
ao2_ref(taskprocessor, -1);
/* The reference to pool is passed to ast_taskpool_shutdown */
ast_taskpool_shutdown(pool);
return NULL;
}
}
/* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do
* this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function
* and to reduce complexity.
*/
if (options->idle_timeout && options->auto_increment) {
pool->shrink_sched_id = ast_sched_add(sched, options->idle_timeout * 1000, taskpool_dynamic_pool_shrink, ao2_bump(pool));
if (pool->shrink_sched_id < 0) {
ao2_ref(pool, -1);
/* The second reference to pool is passed to ast_taskpool_shutdown */
ast_taskpool_shutdown(pool);
return NULL;
}
}
return pool;
}
size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool)
{
size_t count;
ao2_lock(pool);
count = AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors);
ao2_unlock(pool);
return count;
}
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size) (size += ast_taskprocessor_size(tps->taskprocessor))
long ast_taskpool_queue_size(struct ast_taskpool *pool)
{
long queue_size = 0;
ao2_lock(pool);
AST_VECTOR_CALLBACK_VOID(&pool->static_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size);
AST_VECTOR_CALLBACK_VOID(&pool->dynamic_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size);
ao2_unlock(pool);
return queue_size;
}
/*! \internal
* \brief Taskpool dynamic pool grow function
*/
static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
{
unsigned int num_to_add = pool->options.auto_increment;
int i;
if (!num_to_add) {
return;
}
/* If a maximum size is enforced, then determine if we have to limit how many taskprocessors we add */
if (pool->options.max_size) {
unsigned int current_size = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors);
if (current_size + num_to_add > pool->options.max_size) {
num_to_add = pool->options.max_size - current_size;
}
}
for (i = 0; i < num_to_add; i++) {
struct taskpool_taskprocessor *new_taskprocessor;
new_taskprocessor = taskpool_taskprocessor_alloc(pool, 'd');
if (!new_taskprocessor) {
return;
}
if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, new_taskprocessor)) {
ao2_ref(new_taskprocessor, -1);
return;
}
if (i == 0) {
/* On the first iteration we return the taskprocessor we just added */
*taskprocessor = new_taskprocessor;
/* We assume we will be going back to the first taskprocessor, since we are at the end of the vector */
pool->dynamic_taskprocessors.taskprocessor_num = 0;
} else if (i == 1) {
/* On the second iteration we update the next taskprocessor to use to be this one */
pool->dynamic_taskprocessors.taskprocessor_num = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) - 1;
}
}
}
int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
{
RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
/* Select the taskprocessor in the pool to use for pushing this task */
ao2_lock(pool);
if (!pool->shutting_down) {
unsigned int growth_threshold_reached = 0;
/* A selector doesn't set taskprocessor to NULL, it will only change the value if a better
* taskprocessor is found. This means that even if the selector for a dynamic taskprocessor
* fails for some reason, it will still fall back to the initially found static one if
* it is present.
*/
pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached);
if (pool->options.auto_increment && growth_threshold_reached) {
/* If we need to grow then try dynamic taskprocessors */
pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached);
if (growth_threshold_reached) {
/* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */
taskpool_dynamic_pool_grow(pool, &taskprocessor);
}
/* If a dynamic taskprocessor was used update its last push time */
if (taskprocessor) {
taskprocessor->last_pushed = ast_tvnow();
}
}
ao2_bump(taskprocessor);
}
ao2_unlock(pool);
if (!taskprocessor) {
return -1;
}
if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
return -1;
}
return 0;
}
/*!
* \internal Structure used for synchronous task
*/
struct taskpool_sync_task {
ast_mutex_t lock;
ast_cond_t cond;
int complete;
int fail;
int (*task)(void *);
void *task_data;
};
/*!
* \internal Initialization function for synchronous task
*/
static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int (*task)(void *), void *data)
{
ast_mutex_init(&sync_task->lock);
ast_cond_init(&sync_task->cond, NULL);
sync_task->complete = 0;
sync_task->fail = 0;
sync_task->task = task;
sync_task->task_data = data;
return 0;
}
/*!
* \internal Cleanup function for synchronous task
*/
static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
{
ast_mutex_destroy(&sync_task->lock);
ast_cond_destroy(&sync_task->cond);
}
/*!
* \internal Function for executing a sychronous task
*/
static int taskpool_sync_task(void *data)
{
struct taskpool_sync_task *sync_task = data;
int ret;
sync_task->fail = sync_task->task(sync_task->task_data);
/*
* Once we unlock sync_task->lock after signaling, we cannot access
* sync_task again. The thread waiting within ast_taskpool_push_wait()
* is free to continue and release its local variable (sync_task).
*/
ast_mutex_lock(&sync_task->lock);
sync_task->complete = 1;
ast_cond_signal(&sync_task->cond);
ret = sync_task->fail;
ast_mutex_unlock(&sync_task->lock);
return ret;
}
int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
{
struct taskpool_sync_task sync_task;
/* If we are already executing within a taskpool taskprocessor then
* don't bother pushing a new task, just directly execute the task.
*/
if (ast_taskpool_get_current()) {
return task(data);
}
if (taskpool_sync_task_init(&sync_task, task, data)) {
return -1;
}
if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) {
taskpool_sync_task_cleanup(&sync_task);
return -1;
}
ast_mutex_lock(&sync_task.lock);
while (!sync_task.complete) {
ast_cond_wait(&sync_task.cond, &sync_task.lock);
}
ast_mutex_unlock(&sync_task.lock);
taskpool_sync_task_cleanup(&sync_task);
return sync_task.fail;
}
void ast_taskpool_shutdown(struct ast_taskpool *pool)
{
if (!pool) {
return;
}
/* Mark this pool as shutting down so nothing new is pushed */
ao2_lock(pool);
pool->shutting_down = 1;
ao2_unlock(pool);
/* Stop the shrink scheduled item if present */
AST_SCHED_DEL_UNREF(sched, pool->shrink_sched_id, ao2_ref(pool, -1));
/* Clean up all the taskprocessors */
taskpool_taskprocessors_cleanup(&pool->static_taskprocessors);
taskpool_taskprocessors_cleanup(&pool->dynamic_taskprocessors);
ao2_ref(pool, -1);
}
struct serializer {
/*! Taskpool the serializer will use to process the jobs. */
struct ast_taskpool *pool;
/*! Which group will wait for this serializer to shutdown. */
struct ast_serializer_shutdown_group *shutdown_group;
};
static void serializer_dtor(void *obj)
{
struct serializer *ser = obj;
ao2_cleanup(ser->pool);
ser->pool = NULL;
ao2_cleanup(ser->shutdown_group);
ser->shutdown_group = NULL;
}
static struct serializer *serializer_create(struct ast_taskpool *pool,
struct ast_serializer_shutdown_group *shutdown_group)
{
struct serializer *ser;
/* This object has a lock so it can be used to ensure exclusive access
* to the execution of tasks within the serializer.
*/
ser = ao2_alloc(sizeof(*ser), serializer_dtor);
if (!ser) {
return NULL;
}
ser->pool = ao2_bump(pool);
ser->shutdown_group = ao2_bump(shutdown_group);
return ser;
}
AST_THREADSTORAGE_RAW(current_taskpool_serializer);
static int execute_tasks(void *data)
{
struct ast_taskpool *pool = ast_taskpool_get_current();
struct ast_taskprocessor *tps = data;
struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(tps);
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
size_t remaining, requeue = 0;
/* In a normal scenario this lock will not be in contention with
* anything else. It is only if a synchronous task is pushed to
* the serializer that it may be blocked on the synchronous
* task thread. This is done to ensure that only one thread is executing
* tasks from the serializer at a given time, and not out of order
* either.
*/
ao2_lock(ser);
ast_threadstorage_set_ptr(&current_taskpool_serializer, tps);
for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
requeue = ast_taskprocessor_execute(tps);
}
ast_threadstorage_set_ptr(&current_taskpool_serializer, NULL);
ao2_unlock(ser);
/* If there are remaining tasks we requeue, this way the serializer
* does not hold exclusivity of the taskpool taskprocessor
*/
if (requeue) {
/* Ownership passes to the new task */
if (ast_taskpool_push(pool, execute_tasks, tps)) {
ast_taskprocessor_unreference(tps);
}
} else {
ast_taskprocessor_unreference(tps);
}
return 0;
}
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
if (was_empty) {
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
if (ast_taskpool_push(ser->pool, execute_tasks, tps)) {
ast_taskprocessor_unreference(tps);
}
}
}
static int serializer_start(struct ast_taskprocessor_listener *listener)
{
/* No-op */
return 0;
}
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
{
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
if (ser->shutdown_group) {
ast_serializer_shutdown_group_dec(ser->shutdown_group);
}
ao2_cleanup(ser);
}
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
};
struct ast_taskprocessor *ast_taskpool_serializer_get_current(void)
{
return ast_threadstorage_get_ptr(&current_taskpool_serializer);
}
struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name,
struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
{
struct serializer *ser;
struct ast_taskprocessor_listener *listener;
struct ast_taskprocessor *tps;
ser = serializer_create(pool, shutdown_group);
if (!ser) {
return NULL;
}
listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
if (!listener) {
ao2_ref(ser, -1);
return NULL;
}
tps = ast_taskprocessor_create_with_listener(name, listener);
if (!tps) {
/* ser ref transferred to listener but not cleaned without tps */
ao2_ref(ser, -1);
} else if (shutdown_group) {
ast_serializer_shutdown_group_inc(shutdown_group);
}
ao2_ref(listener, -1);
return tps;
}
struct ast_taskprocessor *ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
{
return ast_taskpool_serializer_group(name, pool, NULL);
}
/*!
* \internal An empty task callback, used to ensure the serializer does not
* go empty. */
static int taskpool_serializer_empty_task(void *data)
{
return 0;
}
int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data)
{
struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
struct ast_taskprocessor *prior_serializer;
struct taskpool_sync_task sync_task;
/* If not in a taskpool taskprocessor we can just queue the task like normal and
* wait. */
if (!ast_taskpool_get_current()) {
if (taskpool_sync_task_init(&sync_task, task, data)) {
return -1;
}
if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) {
taskpool_sync_task_cleanup(&sync_task);
return -1;
}
ast_mutex_lock(&sync_task.lock);
while (!sync_task.complete) {
ast_cond_wait(&sync_task.cond, &sync_task.lock);
}
ast_mutex_unlock(&sync_task.lock);
taskpool_sync_task_cleanup(&sync_task);
return sync_task.fail;
}
/* It is possible that we are already executing within a serializer, so stash the existing
* away so we can restore it.
*/
prior_serializer = ast_taskpool_serializer_get_current();
ao2_lock(ser);
/* There are two cases where we can or have to directly execute this task:
* 1. There are no other tasks in the serializer
* 2. We are already in the serializer
* In the second case if we don't execute the task now, we will deadlock waiting
* on it as it will never occur.
*/
if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) {
ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
sync_task.fail = task(data);
ao2_unlock(ser);
ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
return sync_task.fail;
}
if (taskpool_sync_task_init(&sync_task, task, data)) {
ao2_unlock(ser);
return -1;
}
/* First we queue the serialized task */
if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) {
taskpool_sync_task_cleanup(&sync_task);
ao2_unlock(ser);
return -1;
}
/* Next we queue the empty task to ensure the serializer doesn't reach empty, this
* stops two tasks from being queued for the same serializer at the same time.
*/
if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) {
taskpool_sync_task_cleanup(&sync_task);
ao2_unlock(ser);
return -1;
}
/* Now we execute the tasks on the serializer until our sync task is complete */
ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
while (!sync_task.complete) {
/* The sync task is guaranteed to be executed, so doing a while loop on the complete
* flag is safe.
*/
ast_taskprocessor_execute(serializer);
}
taskpool_sync_task_cleanup(&sync_task);
ao2_unlock(ser);
ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
return sync_task.fail;
}
/*!
* \internal
* \brief Clean up resources on Asterisk shutdown
*/
static void taskpool_shutdown(void)
{
if (sched) {
ast_sched_context_destroy(sched);
sched = NULL;
}
}
int ast_taskpool_init(void)
{
sched = ast_sched_context_create();
if (!sched) {
return -1;
}
if (ast_sched_start_thread(sched)) {
return -1;
}
ast_register_cleanup(taskpool_shutdown);
return 0;
}