From 1dc6fecc21baf7eff531ef9f67ba5dc94d1c3288 Mon Sep 17 00:00:00 2001 From: "Joshua C. Colp" Date: Wed, 6 Aug 2025 13:19:20 -0300 Subject: [PATCH] taskpool: Add taskpool API, switch Stasis to using it. This change introduces a new API called taskpool. This is a pool of taskprocessors. It provides the following functionality: 1. Task pushing to a pool of taskprocessors 2. Synchronous tasks 3. Serializers for execution ordering of tasks 4. Growing/shrinking of number of taskprocessors in pool This functionality already exists through the combination of threadpool+taskprocessors but through investigating I determined that this carries substantial overhead for short to medium duration tasks. The threadpool uses a single queue of work, and for management of threads it involves additional tasks. I wrote taskpool to eliminate the extra overhead and management as much as possible. Instead of a single queue of work each taskprocessor has its own queue and at push time a selector chooses the taskprocessor to queue the task to. Each taskprocessor also has its own thread like normal. This spreads out the tasks immediately and reduces contention on shared resources. Using the included efficiency tests the number of tasks that can be executed per second in a taskpool is 6-12 times more than an equivalent threadpool+taskprocessor setup. Stasis has been moved over to using this new API as it is a heavy consumer of threadpool+taskprocessors and produces a lot of tasks. UpgradeNote: The threadpool_* options in stasis.conf have now been deprecated though they continue to be read and used. They have been replaced with taskpool options that give greater control over the underlying taskpool used for stasis. DeveloperNote: The taskpool API has been added for common usage of a pool of taskprocessors. It is suggested to use this API instead of the threadpool+taskprocessor approach. --- configs/samples/stasis.conf.sample | 19 +- include/asterisk/_private.h | 1 + include/asterisk/serializer.h | 21 + include/asterisk/serializer_shutdown_group.h | 65 ++ include/asterisk/taskpool.h | 321 +++++++ include/asterisk/taskprocessor.h | 5 + include/asterisk/threadpool.h | 24 +- main/asterisk.c | 1 + main/serializer.c | 48 + main/serializer_shutdown_group.c | 108 +++ main/stasis.c | 133 ++- main/taskpool.c | 945 +++++++++++++++++++ main/taskprocessor.c | 5 + main/threadpool.c | 101 +- tests/test_taskpool.c | 810 ++++++++++++++++ tests/test_threadpool.c | 259 ++++- 16 files changed, 2691 insertions(+), 175 deletions(-) create mode 100644 include/asterisk/serializer_shutdown_group.h create mode 100644 include/asterisk/taskpool.h create mode 100644 main/serializer_shutdown_group.c create mode 100644 main/taskpool.c create mode 100644 tests/test_taskpool.c diff --git a/configs/samples/stasis.conf.sample b/configs/samples/stasis.conf.sample index b62d1c60ae..24e5619f02 100644 --- a/configs/samples/stasis.conf.sample +++ b/configs/samples/stasis.conf.sample @@ -1,12 +1,13 @@ -[threadpool] -;initial_size = 5 ; Initial size of the threadpool. -; ; 0 means the threadpool has no threads initially -; ; until a task needs a thread. -;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before -; ; dying. 0 means threads never time out. -;max_size = 50 ; Maximum number of threads in the Stasis threadpool. -; ; 0 means no limit to the number of threads in the -; ; threadpool. +[taskpool] +;minimum_size = 5 ; Minimum number of taskprocessors in taskpool. +;initial_size = 5 ; Initial size of the taskpool. +; ; 0 means the taskpool has no taskprocessors initially +; ; until a task needs a taskprocessor. +;idle_timeout_sec = 20 ; Number of seconds a taskprocessor should be idle before +; ; dying. 0 means taskprocessors never time out. +;max_size = 50 ; Maximum number of taskprocessors in the Stasis taskpool. +; ; 0 means no limit to the number of taskprocessors in the +; ; taskpool. [declined_message_types] ; This config section contains the names of message types that should be prevented diff --git a/include/asterisk/_private.h b/include/asterisk/_private.h index 684452fee0..056c0fde19 100644 --- a/include/asterisk/_private.h +++ b/include/asterisk/_private.h @@ -48,6 +48,7 @@ int ast_named_locks_init(void); /*!< Provided by named_locks.c */ int ast_file_init(void); /*!< Provided by file.c */ void ast_autoservice_init(void); /*!< Provided by autoservice.c */ int ast_tps_init(void); /*!< Provided by taskprocessor.c */ +int ast_taskpool_init(void); /*!< Provided by taskpool.c */ int ast_timing_init(void); /*!< Provided by timing.c */ void ast_stun_init(void); /*!< Provided by stun.c */ int ast_ssl_init(void); /*!< Provided by ssl.c */ diff --git a/include/asterisk/serializer.h b/include/asterisk/serializer.h index 112a0bea38..e2e07eefe6 100644 --- a/include/asterisk/serializer.h +++ b/include/asterisk/serializer.h @@ -20,6 +20,7 @@ #define _AST_SERIALIZER_H struct ast_threadpool; +struct ast_taskpool; /*! * Maintains a named pool of thread pooled taskprocessors. Also if configured @@ -56,6 +57,26 @@ int ast_serializer_pool_destroy(struct ast_serializer_pool *pool); struct ast_serializer_pool *ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout); +/*! + * \brief Create a serializer pool on taskpool. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * Create a serializer pool with an optional shutdown group. If a timeout greater + * than -1 is specified then a shutdown group is enabled on the pool. + * + * \param name The base name for the pool, and used when building taskprocessor(s) + * \param size The size of the pool + * \param taskpool The backing taskpool to use + * \param timeout The timeout used if using a shutdown group (-1 = disabled) + * + * \return A newly allocated serializer pool object + * \retval NULL on error + */ + struct ast_serializer_pool *ast_serializer_taskpool_create(const char *name, + unsigned int size, struct ast_taskpool *taskpool, int timeout); + /*! * \brief Retrieve the base name of the serializer pool. * diff --git a/include/asterisk/serializer_shutdown_group.h b/include/asterisk/serializer_shutdown_group.h new file mode 100644 index 0000000000..b4c4adc01e --- /dev/null +++ b/include/asterisk/serializer_shutdown_group.h @@ -0,0 +1,65 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#ifndef _ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H +#define _ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H + +struct ast_serializer_shutdown_group; + +/*! + * \brief Create a serializer group shutdown control object. + * \since 13.5.0 + * + * \return ao2 object to control shutdown of a serializer group. + */ +struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void); + +/*! + * \brief Wait for the serializers in the group to shutdown with timeout. + * \since 13.5.0 + * + * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL) + * \param timeout Number of seconds to wait for the serializers in the group to shutdown. + * Zero if the timeout is disabled. + * + * \return Number of serializers that did not get shutdown within the timeout. + */ +int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout); + +/*! + * \brief Increment the number of serializer members in the group. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param shutdown_group Group shutdown controller. + */ + void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group); + + /*! + * \brief Decrement the number of serializer members in the group. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param shutdown_group Group shutdown controller. + */ +void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group); + +#endif /* ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H */ diff --git a/include/asterisk/taskpool.h b/include/asterisk/taskpool.h new file mode 100644 index 0000000000..2a4f963052 --- /dev/null +++ b/include/asterisk/taskpool.h @@ -0,0 +1,321 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2025, Sangoma Technologies Corporation + * + * Joshua C. Colp + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * \ref Taskpool + * + * \page Taskpool API providing queued task execution across threads. + +The taskpool API is a specialized API for the queueing of tasks +in a synchronous or asynchronous manner, to be executed across +a pool of threads. For cases where serialization is needed a +serializer API is also provided ensuring that tasks queued to +the serializer are executed in a serialized fashion within the +taskpool. + +On creation of a taskpool various options can be set and used to +control the operation of the pool. This includes how many taskprocessors +are present, whether the pool can grow, whether the pool can shrink, +and how long idle taskprocessors should exist before being terminated. +This provides flexibility based on the specific needs of the user of +the taskpool and the environment. + +The queueing of tasks to the taskpool is done using a selector. The +selector examines the available taskprocessors and decides which one +to queue the task to. This operation can also examine the state of +the pool to see if it needs to grow and if enabled and possible does so. + +The taskpool API is preferred for many cases over the use of the +threadpool due to the far lower overhead involved. Taskpools require +no additional thread or task queue for management of the pool itself and +the act of queueing tasks, the most common operation, is written to be as +simple and minimal as possible. Threadpools are best used for long +running tasks and operations. + +*/ + +#ifndef _ASTERISK_TASKPOOL_H +#define _ASTERISK_TASKPOOL_H + +struct ast_taskpool; +struct ast_taskprocessor; +struct ast_serializer_shutdown_group; + +/*! + * \brief Selectors for choosing which taskprocessor in a pool to use + */ +enum ast_taskpool_selector { + AST_TASKPOOL_SELECTOR_DEFAULT = 0, /* The selector that is generally the best for most use cases */ + AST_TASKPOOL_SELECTOR_LEAST_FULL, /* Select the least full taskprocessor */ + AST_TASKPOOL_SELECTOR_SEQUENTIAL, /* Select taskprocessors in a sequential manner */ +}; + +struct ast_taskpool_options { +#define AST_TASKPOOL_OPTIONS_VERSION 1 + /*! Version of taskpool options in use */ + int version; + /*! + * \brief The selector to use for choosing a taskprocessor + */ + enum ast_taskpool_selector selector; + /*! + * \brief Time limit in seconds for idle dynamic taskprocessors + * + * A time of 0 or less will mean no timeout. + */ + int idle_timeout; + /*! + * \brief Number of taskprocessors to increment the pool by + */ + int auto_increment; + /*! + * \brief Number of taskprocessors that will always exist + * + * Zero is a valid value if the taskpool will never have taskprocessors + * that always exist, allowing the pool to drop to zero if not used. + */ + int minimum_size; + /*! + * \brief Number of taskprocessors the pool will start with + * + * Zero is a valid value if the taskpool should start + * without any taskprocessors allocated. + * + * \note This must be equal to or greater than the minimum_size, + * otherwise the taskpool will adjust this to the minimum_size. + */ + int initial_size; + /*! + * \brief Maximum number of taskprocessors a pool may have + * + * When the taskpool's size increases, it can never increase + * beyond this number of taskprocessors. + * + * Zero is a valid value if the taskpool does not have a + * maximum size for taskprocessors. + * + * \note This must be equal to or greater than the initial_size, + * otherwise the taskpool will adjust this to the initial_size. + */ + int max_size; + /*! + * \brief The threshold for when to grow the pool + * + * This is the number of tasks that must be in queue before the pool will grow. + * + * \note If not specified a default of the 50% of the high water threshold defined + * in taskprocessor.h will be used. + */ + int growth_threshold; + /*! + * \brief Function to call when a taskprocessor starts + * + * This is useful if there is something common that all + * taskprocessors in a taskpool need to do when they start. + */ + void (*thread_start)(void); + /*! + * \brief Function to call when a taskprocessor ends + * + * This is useful if there is common cleanup to execute when + * a taskprocessor completes + */ + void (*thread_end)(void); +}; + +/*! + * \brief Create a new taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * This function creates a taskpool. Tasks may be pushed onto this task pool + * and will be automatically acted upon by taskprocessors within the pool. + * + * Only a single taskpool with a given name may exist. This function will fail + * if a taskpool with the given name already exists. + * + * \param name The unique name for the taskpool + * \param options The behavioral options for this taskpool + * \retval NULL Failed to create the taskpool + * \retval non-NULL The newly-created taskpool + * + * \note The \ref ast_taskpool_shutdown function must be called to shut down the + * taskpool and clean up underlying resources fully. + */ +struct ast_taskpool *ast_taskpool_create(const char *name, + const struct ast_taskpool_options *options); + +/*! + * \brief Get the current number of taskprocessors in the taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param pool The taskpool to query + * \retval The number of taskprocessors in the taskpool + */ +size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool); + +/*! + * \brief Get the current number of queued tasks in the taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param pool The taskpool to query + * \retval The number of queued tasks in the taskpool + */ +long ast_taskpool_queue_size(struct ast_taskpool *pool); + +/*! + * \brief Push a task to the taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * Tasks pushed into the taskpool will be automatically taken by + * one of the taskprocessors within + * \param pool The taskpool to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data) + attribute_warn_unused_result; + +/*! + * \brief Push a task to the taskpool, and wait for completion + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * Tasks pushed into the taskpool will be automatically taken by + * one of the taskprocessors within + * \param pool The taskpool to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data) + attribute_warn_unused_result; + +/*! + * \brief Shut down a taskpool and remove the underlying taskprocessors + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param pool The pool to shut down + * + * \note This will decrement the reference to the pool + */ +void ast_taskpool_shutdown(struct ast_taskpool *pool); + +/*! + * \brief Get the taskpool serializer currently associated with this thread. + * + * \note The returned pointer is valid while the serializer + * thread is running. + * + * \note Use ao2_ref() on serializer if you are going to keep it + * for another thread. To unref it you must then use + * ast_taskprocessor_unreference(). + * + * \retval serializer on success. + * \retval NULL on error or no serializer associated with the thread. + */ + struct ast_taskprocessor *ast_taskpool_serializer_get_current(void); + +/*! + * \brief Serialized execution of tasks within a \ref ast_taskpool. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * A \ref ast_taskprocessor with the same contract as a default taskprocessor + * (tasks execute serially) except instead of executing out of a dedicated + * thread, execution occurs in a taskprocessor from a \ref ast_taskpool. + * + * While it guarantees that each task will complete before executing the next, + * there is no guarantee as to which thread from the \c pool individual tasks + * will execute. This normally only matters if your code relies on thread + * specific information, such as thread locals. + * + * Use ast_taskprocessor_unreference() to dispose of the returned \ref + * ast_taskprocessor. + * + * Only a single taskprocessor with a given name may exist. This function will fail + * if a taskprocessor with the given name already exists. + * + * \param name Name of the serializer. (must be unique) + * \param pool \ref ast_taskpool for execution. + * + * \return \ref ast_taskprocessor for enqueuing work. + * \retval NULL on error. + */ +struct ast_taskprocessor *ast_taskpool_serializer(const char *name, struct ast_taskpool *pool); + +/*! + * \brief Serialized execution of tasks within a \ref ast_taskpool. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * A \ref ast_taskprocessor with the same contract as a default taskprocessor + * (tasks execute serially) except instead of executing out of a dedicated + * thread, execution occurs in a taskprocessor from a \ref ast_taskpool. + * + * While it guarantees that each task will complete before executing the next, + * there is no guarantee as to which thread from the \c pool individual tasks + * will execute. This normally only matters if your code relies on thread + * specific information, such as thread locals. + * + * Use ast_taskprocessor_unreference() to dispose of the returned \ref + * ast_taskprocessor. + * + * Only a single taskprocessor with a given name may exist. This function will fail + * if a taskprocessor with the given name already exists. + * + * \param name Name of the serializer. (must be unique) + * \param pool \ref ast_taskpool for execution. + * \param shutdown_group Group shutdown controller. (NULL if no group association) + * + * \return \ref ast_taskprocessor for enqueuing work. + * \retval NULL on error. + */ +struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name, + struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group); + +/*! + * \brief Push a task to a serializer, and wait for completion + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param serializer The serializer to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data); + +#endif /* ASTERISK_TASKPOOL_H */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 73c5c2ec99..3e3886eb1e 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -340,6 +340,11 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps); */ long ast_taskprocessor_size(struct ast_taskprocessor *tps); +/*! + * \brief Return the listener associated with the taskprocessor + */ +struct ast_taskprocessor_listener *ast_taskprocessor_listener(struct ast_taskprocessor *tps); + /*! * \brief Get the current taskprocessor high water alert count. * \since 13.10.0 diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index a72e4523a1..72b2863c5d 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -24,6 +24,8 @@ struct ast_threadpool; struct ast_taskprocessor; struct ast_threadpool_listener; +#include "asterisk/serializer_shutdown_group.h" + struct ast_threadpool_listener_callbacks { /*! * \brief Indicates that the state of threads in the pool has changed @@ -196,28 +198,6 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo */ void ast_threadpool_shutdown(struct ast_threadpool *pool); -struct ast_serializer_shutdown_group; - -/*! - * \brief Create a serializer group shutdown control object. - * \since 13.5.0 - * - * \return ao2 object to control shutdown of a serializer group. - */ -struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void); - -/*! - * \brief Wait for the serializers in the group to shutdown with timeout. - * \since 13.5.0 - * - * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL) - * \param timeout Number of seconds to wait for the serializers in the group to shutdown. - * Zero if the timeout is disabled. - * - * \return Number of serializers that did not get shutdown within the timeout. - */ -int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout); - /*! * \brief Get the threadpool serializer currently associated with this thread. * \since 14.0.0 diff --git a/main/asterisk.c b/main/asterisk.c index 76b927d3f2..5c38031cbe 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -4247,6 +4247,7 @@ static void asterisk_daemon(int isroot, const char *runuser, const char *rungrou check_init(ast_utils_init(), "Utilities"); check_init(ast_tps_init(), "Task Processor Core"); + check_init(ast_taskpool_init(), "Taskpool Support"); check_init(ast_fd_init(), "File Descriptor Debugging"); check_init(ast_pbx_init(), "ast_pbx_init"); check_init(aco_init(), "Configuration Option Framework"); diff --git a/main/serializer.c b/main/serializer.c index a10dc496d6..5fcb279332 100644 --- a/main/serializer.c +++ b/main/serializer.c @@ -22,8 +22,10 @@ #include "asterisk/serializer.h" #include "asterisk/taskprocessor.h" #include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #include "asterisk/utils.h" #include "asterisk/vector.h" +#include "asterisk/serializer_shutdown_group.h" struct ast_serializer_pool { /*! Shutdown group to monitor serializers. */ @@ -119,6 +121,52 @@ struct ast_serializer_pool *ast_serializer_pool_create(const char *name, return pool; } +struct ast_serializer_pool *ast_serializer_taskpool_create(const char *name, + unsigned int size, struct ast_taskpool *taskpool, int timeout) +{ + struct ast_serializer_pool *pool; + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + size_t idx; + + ast_assert(size > 0); + + pool = ast_malloc(sizeof(*pool) + strlen(name) + 1); + if (!pool) { + return NULL; + } + + strcpy(pool->name, name); /* safe */ + + pool->shutdown_group_timeout = timeout; + pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL; + + AST_VECTOR_RW_INIT(&pool->serializers, size); + + for (idx = 0; idx < size; ++idx) { + struct ast_taskprocessor *tps; + + /* Create name with seq number appended. */ + ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name); + + tps = ast_taskpool_serializer_group(tps_name, taskpool, pool->shutdown_group); + if (!tps) { + ast_serializer_pool_destroy(pool); + ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n", + tps_name); + return NULL; + } + + if (AST_VECTOR_APPEND(&pool->serializers, tps)) { + ast_serializer_pool_destroy(pool); + ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n", + tps_name); + return NULL; + } + } + + return pool; +} + const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool) { return pool->name; diff --git a/main/serializer_shutdown_group.c b/main/serializer_shutdown_group.c new file mode 100644 index 0000000000..f232d0f6f1 --- /dev/null +++ b/main/serializer_shutdown_group.c @@ -0,0 +1,108 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#include "asterisk.h" + +#include "asterisk/serializer_shutdown_group.h" +#include "asterisk/astobj2.h" +#include "asterisk/time.h" +#include "asterisk/utils.h" + +/*! Serializer group shutdown control object. */ +struct ast_serializer_shutdown_group { + /*! Shutdown thread waits on this conditional. */ + ast_cond_t cond; + /*! Count of serializers needing to shutdown. */ + int count; +}; + +static void serializer_shutdown_group_dtor(void *vdoomed) +{ + struct ast_serializer_shutdown_group *doomed = vdoomed; + + ast_cond_destroy(&doomed->cond); +} + +struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void) +{ + struct ast_serializer_shutdown_group *shutdown_group; + + shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor); + if (!shutdown_group) { + return NULL; + } + ast_cond_init(&shutdown_group->cond, NULL); + return shutdown_group; +} + +int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout) +{ + int remaining; + ast_mutex_t *lock; + + if (!shutdown_group) { + return 0; + } + + lock = ao2_object_get_lockaddr(shutdown_group); + ast_assert(lock != NULL); + + ao2_lock(shutdown_group); + if (timeout) { + struct timeval start; + struct timespec end; + + start = ast_tvnow(); + end.tv_sec = start.tv_sec + timeout; + end.tv_nsec = start.tv_usec * 1000; + while (shutdown_group->count) { + if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) { + /* Error or timed out waiting for the count to reach zero. */ + break; + } + } + } else { + while (shutdown_group->count) { + if (ast_cond_wait(&shutdown_group->cond, lock)) { + /* Error */ + break; + } + } + } + remaining = shutdown_group->count; + ao2_unlock(shutdown_group); + return remaining; +} + +void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group) +{ + ao2_lock(shutdown_group); + ++shutdown_group->count; + ao2_unlock(shutdown_group); +} + +void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group) +{ + ao2_lock(shutdown_group); + --shutdown_group->count; + if (!shutdown_group->count) { + ast_cond_signal(&shutdown_group->cond); + } + ao2_unlock(shutdown_group); +} diff --git a/main/stasis.c b/main/stasis.c index c85fb29c5a..f9d5fbdd64 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -33,7 +33,7 @@ #include "asterisk/stasis_internal.h" #include "asterisk/stasis.h" #include "asterisk/taskprocessor.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #include "asterisk/utils.h" #include "asterisk/uuid.h" #include "asterisk/vector.h" @@ -95,6 +95,36 @@ Maximum number of threads in the threadpool. + + + 24.0.0 + + Settings that configure the taskpool Stasis uses to deliver some messages. + + + 24.0.0 + + Minimum number of taskprocessors in the message bus taskpool. + + + + 24.0.0 + + Initial number of taskprocessors in the message bus taskpool. + + + + 24.0.0 + + Number of seconds before an idle taskprocessor is disposed of. + + + + 24.0.0 + + Maximum number of taskprocessors in the taskpool. + + 13.0.0 @@ -331,8 +361,8 @@ /*! The number of buckets to use for topic pools */ #define TOPIC_POOL_BUCKETS 57 -/*! Thread pool for topics that don't want a dedicated taskprocessor */ -static struct ast_threadpool *threadpool; +/*! Taskpool for topics that don't want a dedicated taskprocessor */ +static struct ast_taskpool *taskpool; STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); @@ -693,8 +723,8 @@ struct stasis_subscription_statistics { int messages_passed; /*! \brief Using a mailbox to queue messages */ int uses_mailbox; - /*! \brief Using stasis threadpool for handling messages */ - int uses_threadpool; + /*! \brief Using stasis taskpool for handling messages */ + int uses_taskpool; /*! \brief The line number where the subscription originates */ int lineno; /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */ @@ -847,7 +877,7 @@ static void subscription_statistics_destroy(void *obj) } static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub, - int needs_mailbox, int use_thread_pool, const char *file, int lineno, + int needs_mailbox, int use_taskpool, const char *file, int lineno, const char *func) { struct stasis_subscription_statistics *statistics; @@ -872,7 +902,7 @@ static struct stasis_subscription_statistics *stasis_subscription_statistics_cre statistics->lineno = lineno; statistics->func = func; statistics->uses_mailbox = needs_mailbox; - statistics->uses_threadpool = use_thread_pool; + statistics->uses_taskpool = use_taskpool; strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */ statistics->sub = sub; ao2_link(subscription_stats, statistics); @@ -886,7 +916,7 @@ struct stasis_subscription *internal_stasis_subscribe( stasis_subscription_cb callback, void *data, int needs_mailbox, - int use_thread_pool, + int use_taskpool, const char *file, int lineno, const char *func) @@ -906,7 +936,7 @@ struct stasis_subscription *internal_stasis_subscribe( #ifdef AST_DEVMODE ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1)); - sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func); + sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_taskpool, file, lineno, func); if (ret < 0 || !sub->statistics) { ao2_ref(sub, -1); return NULL; @@ -924,7 +954,7 @@ struct stasis_subscription *internal_stasis_subscribe( /* Create name with seq number appended. */ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s", - use_thread_pool ? 'p' : 'm', + use_taskpool ? 'p' : 'm', stasis_topic_name(topic)); /* @@ -932,8 +962,8 @@ struct stasis_subscription *internal_stasis_subscribe( * acceptable. For a large number of subscribers, a thread * pool should be used. */ - if (use_thread_pool) { - sub->mailbox = ast_threadpool_serializer(tps_name, threadpool); + if (use_taskpool) { + sub->mailbox = ast_taskpool_serializer(tps_name, taskpool); } else { sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT); } @@ -2210,19 +2240,21 @@ struct stasis_declined_config { struct ao2_container *declined; }; -/*! \brief Threadpool configuration options */ -struct stasis_threadpool_conf { - /*! Initial size of the thread pool */ +/*! \brief Taskpool configuration options */ +struct stasis_taskpool_conf { + /*! Minimum size of the taskpool */ + int minimum_size; + /*! Initial size of the taskpool */ int initial_size; - /*! Time, in seconds, before we expire a thread */ + /*! Time, in seconds, before we expire a taskprocessor */ int idle_timeout_sec; - /*! Maximum number of thread to allow */ + /*! Maximum number of taskprocessors to allow */ int max_size; }; struct stasis_config { - /*! Thread pool configuration options */ - struct stasis_threadpool_conf *threadpool_options; + /*! Taskpool configuration options */ + struct stasis_taskpool_conf *taskpool_options; /*! Declined message types */ struct stasis_declined_config *declined_message_types; }; @@ -2230,12 +2262,20 @@ struct stasis_config { static struct aco_type threadpool_option = { .type = ACO_GLOBAL, .name = "threadpool", - .item_offset = offsetof(struct stasis_config, threadpool_options), + .item_offset = offsetof(struct stasis_config, taskpool_options), .category = "threadpool", .category_match = ACO_WHITELIST_EXACT, }; -static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option); +static struct aco_type taskpool_option = { + .type = ACO_GLOBAL, + .name = "taskpool", + .item_offset = offsetof(struct stasis_config, taskpool_options), + .category = "taskpool", + .category_match = ACO_WHITELIST_EXACT, +}; + +static struct aco_type *taskpool_options[] = ACO_TYPES(&threadpool_option, &taskpool_option); /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */ static struct aco_type declined_option = { @@ -2250,7 +2290,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option); struct aco_file stasis_conf = { .filename = "stasis.conf", - .types = ACO_TYPES(&declined_option, &threadpool_option), + .types = ACO_TYPES(&declined_option, &threadpool_option, &taskpool_option), }; /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */ @@ -2275,7 +2315,7 @@ static void stasis_config_destructor(void *obj) struct stasis_config *cfg = obj; ao2_cleanup(cfg->declined_message_types); - ast_free(cfg->threadpool_options); + ast_free(cfg->taskpool_options); } static void *stasis_config_alloc(void) @@ -2286,8 +2326,8 @@ static void *stasis_config_alloc(void) return NULL; } - cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options)); - if (!cfg->threadpool_options) { + cfg->taskpool_options = ast_calloc(1, sizeof(*cfg->taskpool_options)); + if (!cfg->taskpool_options) { ao2_ref(cfg, -1); return NULL; } @@ -2678,7 +2718,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped); ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed); ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No"); - ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No"); + ast_cli(a->fd, "Using stasis taskpool for handling messages: %s\n", statistics->uses_taskpool ? "Yes" : "No"); ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked); ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked); @@ -3078,8 +3118,8 @@ static void stasis_cleanup(void) ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis)); ao2_cleanup(topic_all); topic_all = NULL; - ast_threadpool_shutdown(threadpool); - threadpool = NULL; + ast_taskpool_shutdown(taskpool); + taskpool = NULL; STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type); aco_info_destroy(&cfg_info); @@ -3090,7 +3130,7 @@ int stasis_init(void) { struct stasis_config *cfg; int cache_init; - struct ast_threadpool_options threadpool_opts = { 0, }; + struct ast_taskpool_options taskpool_opts = { 0, }; #ifdef AST_DEVMODE struct ao2_container *subscription_stats; struct ao2_container *topic_stats; @@ -3105,17 +3145,21 @@ int stasis_init(void) aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0); + aco_option_register(&cfg_info, "minimum_size", ACO_EXACT, + taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, minimum_size), 0, + INT_MAX); aco_option_register(&cfg_info, "initial_size", ACO_EXACT, - threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, - FLDSET(struct stasis_threadpool_conf, initial_size), 0, + taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, initial_size), 0, INT_MAX); aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT, - threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE, - FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0, + taskpool_options, "20", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0, INT_MAX); aco_option_register(&cfg_info, "max_size", ACO_EXACT, - threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE, - FLDSET(struct stasis_threadpool_conf, max_size), 0, + taskpool_options, "50", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, max_size), 0, INT_MAX); if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) { @@ -3125,7 +3169,7 @@ int stasis_init(void) return -1; } - if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) { + if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) { ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n"); ao2_ref(default_cfg, -1); @@ -3151,15 +3195,16 @@ int stasis_init(void) } } - threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION; - threadpool_opts.initial_size = cfg->threadpool_options->initial_size; - threadpool_opts.auto_increment = 1; - threadpool_opts.max_size = cfg->threadpool_options->max_size; - threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec; - threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts); + taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION; + taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size; + taskpool_opts.initial_size = cfg->taskpool_options->initial_size; + taskpool_opts.auto_increment = 1; + taskpool_opts.max_size = cfg->taskpool_options->max_size; + taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec; + taskpool = ast_taskpool_create("stasis", &taskpool_opts); ao2_ref(cfg, -1); - if (!threadpool) { - ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n"); + if (!taskpool) { + ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n"); return -1; } diff --git a/main/taskpool.c b/main/taskpool.c new file mode 100644 index 0000000000..59ac4b0c72 --- /dev/null +++ b/main/taskpool.c @@ -0,0 +1,945 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2025, Sangoma Technologies Corporation + * + * Joshua Colp + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#include "asterisk.h" + +#include "asterisk/_private.h" +#include "asterisk/taskpool.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/astobj2.h" +#include "asterisk/serializer_shutdown_group.h" +#include "asterisk/utils.h" +#include "asterisk/time.h" +#include "asterisk/sched.h" + +/*! + * \brief A taskpool taskprocessor + */ +struct taskpool_taskprocessor { + /*! The underlying taskprocessor */ + struct ast_taskprocessor *taskprocessor; + /*! The last time a task was pushed to this taskprocessor */ + struct timeval last_pushed; +}; + +/*! + * \brief A container of taskprocessors + */ +struct taskpool_taskprocessors { + /*! A vector of taskprocessors */ + AST_VECTOR(, struct taskpool_taskprocessor *) taskprocessors; + /*! The next taskprocessor to use for pushing */ + unsigned int taskprocessor_num; +}; + +typedef void (*taskpool_selector)(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, + struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached); + +/*! + * \brief An opaque taskpool structure + * + * A taskpool is a collection of taskprocessors that + * execute tasks, each from their own queue. A selector + * determines which taskprocessor to queue to at push + * time. + */ +struct ast_taskpool { + /*! The static taskprocessors, those which will always exist */ + struct taskpool_taskprocessors static_taskprocessors; + /*! The dynamic taskprocessors, those which will be created as needed */ + struct taskpool_taskprocessors dynamic_taskprocessors; + /*! True if the taskpool is in the process of shutting down */ + int shutting_down; + /*! Taskpool-specific options */ + struct ast_taskpool_options options; + /*! Dynamic pool shrinking scheduled item */ + int shrink_sched_id; + /*! The taskprocessor selector to use */ + taskpool_selector selector; + /*! The name of the taskpool */ + char name[0]; +}; + +/*! \brief The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold) */ +#define TASKPOOL_GROW_THRESHOLD (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10 + +/*! \brief Scheduler used for dynamic pool shrinking */ +static struct ast_sched_context *sched; + +/*! \brief Thread storage for the current taskpool */ +AST_THREADSTORAGE_RAW(current_taskpool_pool); + +/*! + * \internal + * \brief Get the current taskpool associated with this thread. + */ +static struct ast_taskpool *ast_taskpool_get_current(void) +{ + return ast_threadstorage_get_ptr(¤t_taskpool_pool); +} + +/*! + * \internal + * \brief Shutdown task for taskpool taskprocessor + */ + static int taskpool_taskprocessor_stop(void *data) + { + struct ast_taskpool *pool = ast_taskpool_get_current(); + + /* If a thread stop callback is set on the options, call it */ + if (pool->options.thread_end) { + pool->options.thread_end(); + } + + ao2_cleanup(pool); + + return 0; + } + +/*! \internal */ +static void taskpool_taskprocessor_dtor(void *obj) +{ + struct taskpool_taskprocessor *taskprocessor = obj; + + if (taskprocessor->taskprocessor && ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_stop, NULL)) { + /* We can't actually do anything if this fails, so just accept reality */ + } + + ast_taskprocessor_unreference(taskprocessor->taskprocessor); +} + +/*! + * \internal + * \brief Startup task for taskpool taskprocessor + */ +static int taskpool_taskprocessor_start(void *data) +{ + struct ast_taskpool *pool = data; + + /* Set the pool on the thread for this taskprocessor, inheriting the + * reference passed to the task itself. + */ + ast_threadstorage_set_ptr(¤t_taskpool_pool, pool); + + /* If a thread start callback is set on the options, call it */ + if (pool->options.thread_start) { + pool->options.thread_start(); + } + + return 0; +} + +/*! + * \internal + * \brief Allocate a taskpool specific taskprocessor + */ +static struct taskpool_taskprocessor *taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type) +{ + struct taskpool_taskprocessor *taskprocessor; + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + + /* We don't actually need locking for each pool taskprocessor, as the only thing + * mutable is the underlying taskprocessor which has its own internal locking. + */ + taskprocessor = ao2_alloc_options(sizeof(*taskprocessor), taskpool_taskprocessor_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!taskprocessor) { + return NULL; + } + + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "taskpool/%c:%s", type, pool->name); + + taskprocessor->taskprocessor = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT); + if (!taskprocessor->taskprocessor) { + ao2_ref(taskprocessor, -1); + return NULL; + } + + taskprocessor->last_pushed = ast_tvnow(); + + if (ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_start, ao2_bump(pool))) { + ao2_ref(pool, -1); + /* Prevent the taskprocessor from queueing the stop task by explicitly unreferencing and setting it to + * NULL here. + */ + ast_taskprocessor_unreference(taskprocessor->taskprocessor); + taskprocessor->taskprocessor = NULL; + return NULL; + } + + return taskprocessor; +} + +/*! + * \internal + * \brief Initialize the taskpool taskprocessors structure + */ +static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size) +{ + if (AST_VECTOR_INIT(&taskprocessors->taskprocessors, size)) { + return -1; + } + + return 0; +} + +/*! + * \internal + * \brief Clean up the taskpool taskprocessors structure + */ +static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors) +{ + /* Access/manipulation of taskprocessors is done with the lock held, and + * with a check of the shutdown flag done. This means that outside of holding + * the lock we can safely muck with it. Pushing to the taskprocessor is done + * outside of the lock, but with a reference to the taskprocessor held. + */ + AST_VECTOR_CALLBACK_VOID(&taskprocessors->taskprocessors, ao2_cleanup); + AST_VECTOR_FREE(&taskprocessors->taskprocessors); +} + +/*! + * \internal + * \brief Determine if a taskpool taskprocessor is idle + */ +#define TASKPROCESSOR_IS_IDLE(tps, timeout) (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout)) + +/*! \internal + * \brief Taskpool dynamic pool shrink function + */ +static int taskpool_dynamic_pool_shrink(const void *data) +{ + struct ast_taskpool *pool = (struct ast_taskpool *)data; + int num_removed; + + ao2_lock(pool); + + /* If the pool is shutting down, do nothing and don't reschedule */ + if (pool->shutting_down) { + ao2_unlock(pool); + ao2_ref(pool, -1); + return 0; + } + + /* Go through the dynamic taskprocessors and find any which have been idle long enough and remove them */ + num_removed = AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(&pool->dynamic_taskprocessors.taskprocessors, pool->options.idle_timeout * 1000, + TASKPROCESSOR_IS_IDLE, ao2_cleanup); + if (num_removed) { + /* If we've removed any taskprocessors the taskprocessor_num may no longer be valid, so update it */ + if (pool->dynamic_taskprocessors.taskprocessor_num >= AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors)) { + pool->dynamic_taskprocessors.taskprocessor_num = 0; + } + } + + ao2_unlock(pool); + + /* It is possible for the pool to have been shut down between unlocking and returning, this is + * inherently a race condition we can't eliminate so we will catch it on the next iteration. + */ + return pool->options.idle_timeout * 1000; +} + +/*! + * \internal + * \brief Sequential taskprocessor selector + */ + static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, + struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached) +{ + unsigned int taskprocessor_num = taskprocessors->taskprocessor_num; + + if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) { + *growth_threshold_reached = 1; + return; + } + + taskprocessors->taskprocessor_num++; + if (taskprocessors->taskprocessor_num == AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) { + taskprocessors->taskprocessor_num = 0; + } + + *taskprocessor = AST_VECTOR_GET(&taskprocessors->taskprocessors, taskprocessor_num); + + /* Check to see if this has reached the growth threshold */ + *growth_threshold_reached = (ast_taskprocessor_size((*taskprocessor)->taskprocessor) >= pool->options.growth_threshold) ? 1 : 0; +} + +/*! + * \interal + * \brief Least full taskprocessor selector + */ +static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, + struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached) +{ + struct taskpool_taskprocessor *least_full = NULL; + unsigned int i; + + if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) { + *growth_threshold_reached = 1; + return; + } + + /* We assume that the growth threshold has not yet been reached, until proven otherwise */ + *growth_threshold_reached = 0; + + for (i = 0; i < AST_VECTOR_SIZE(&taskprocessors->taskprocessors); i++) { + struct taskpool_taskprocessor *tp = AST_VECTOR_GET(&taskprocessors->taskprocessors, i); + + /* If this taskprocessor has no outstanding tasks, it is the best choice */ + if (!ast_taskprocessor_size(tp->taskprocessor)) { + *taskprocessor = tp; + return; + } + + /* If any of the taskprocessors have reached the growth threshold then we should grow the pool */ + if (ast_taskprocessor_size(tp->taskprocessor) >= pool->options.growth_threshold) { + *growth_threshold_reached = 1; + } + + /* The taskprocessor with the fewest tasks should be used */ + if (!least_full || ast_taskprocessor_size(tp->taskprocessor) < ast_taskprocessor_size(least_full->taskprocessor)) { + least_full = tp; + } + } + + *taskprocessor = least_full; +} + +struct ast_taskpool *ast_taskpool_create(const char *name, + const struct ast_taskpool_options *options) +{ + struct ast_taskpool *pool; + + /* Enforce versioning on the passed-in options */ + if (options->version != AST_TASKPOOL_OPTIONS_VERSION) { + return NULL; + } + + pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL); + if (!pool) { + return NULL; + } + + strcpy(pool->name, name); /* Safe */ + memcpy(&pool->options, options, sizeof(pool->options)); + pool->shrink_sched_id = -1; + + /* Verify the passed-in options are valid, and adjust if needed */ + if (options->initial_size < options->minimum_size) { + pool->options.initial_size = options->minimum_size; + ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n", + name, options->initial_size, options->minimum_size, options->minimum_size); + } + + if (options->max_size && pool->options.initial_size > options->max_size) { + pool->options.max_size = pool->options.initial_size; + ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n", + name, options->max_size, pool->options.initial_size, pool->options.initial_size); + } + + if (!options->auto_increment) { + if (!pool->options.minimum_size) { + pool->options.minimum_size = 1; + ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name); + } + if (!pool->options.max_size) { + pool->options.max_size = pool->options.minimum_size; + ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size); + } + if (pool->options.minimum_size != pool->options.max_size) { + pool->options.minimum_size = pool->options.max_size; + pool->options.initial_size = pool->options.max_size; + ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n", + name, options->minimum_size, pool->options.max_size, pool->options.max_size); + } + } else if (!options->growth_threshold) { + pool->options.growth_threshold = TASKPOOL_GROW_THRESHOLD; + } + + if (options->selector == AST_TASKPOOL_SELECTOR_DEFAULT || options->selector == AST_TASKPOOL_SELECTOR_LEAST_FULL) { + pool->selector = taskpool_least_full_selector; + } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) { + pool->selector = taskpool_sequential_selector; + } else { + ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n", + name, options->selector); + pool->selector = taskpool_least_full_selector; + } + + if (taskpool_taskprocessors_init(&pool->static_taskprocessors, pool->options.minimum_size)) { + ao2_ref(pool, -1); + return NULL; + } + + /* Create the static taskprocessors based on the passed-in options */ + for (int i = 0; i < pool->options.minimum_size; i++) { + struct taskpool_taskprocessor *taskprocessor; + + taskprocessor = taskpool_taskprocessor_alloc(pool, 's'); + if (!taskprocessor) { + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + + if (AST_VECTOR_APPEND(&pool->static_taskprocessors.taskprocessors, taskprocessor)) { + ao2_ref(taskprocessor, -1); + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + } + + if (taskpool_taskprocessors_init(&pool->dynamic_taskprocessors, + pool->options.initial_size - pool->options.minimum_size)) { + ast_taskpool_shutdown(pool); + return NULL; + } + + /* Create the dynamic taskprocessor based on the passed-in options */ + for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) { + struct taskpool_taskprocessor *taskprocessor; + + taskprocessor = taskpool_taskprocessor_alloc(pool, 'd'); + if (!taskprocessor) { + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + + if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, taskprocessor)) { + ao2_ref(taskprocessor, -1); + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + } + + /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do + * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function + * and to reduce complexity. + */ + if (options->idle_timeout && options->auto_increment) { + pool->shrink_sched_id = ast_sched_add(sched, options->idle_timeout * 1000, taskpool_dynamic_pool_shrink, ao2_bump(pool)); + if (pool->shrink_sched_id < 0) { + ao2_ref(pool, -1); + /* The second reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + } + + return pool; +} + +size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool) +{ + size_t count; + + ao2_lock(pool); + count = AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors); + ao2_unlock(pool); + + return count; +} + +#define TASKPOOL_QUEUE_SIZE_ADD(tps, size) (size += ast_taskprocessor_size(tps->taskprocessor)) + +long ast_taskpool_queue_size(struct ast_taskpool *pool) +{ + long queue_size = 0; + + ao2_lock(pool); + AST_VECTOR_CALLBACK_VOID(&pool->static_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size); + AST_VECTOR_CALLBACK_VOID(&pool->dynamic_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size); + ao2_unlock(pool); + + return queue_size; +} + +/*! \internal + * \brief Taskpool dynamic pool grow function + */ +static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor) +{ + unsigned int num_to_add = pool->options.auto_increment; + int i; + + if (!num_to_add) { + return; + } + + /* If a maximum size is enforced, then determine if we have to limit how many taskprocessors we add */ + if (pool->options.max_size) { + unsigned int current_size = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors); + + if (current_size + num_to_add > pool->options.max_size) { + num_to_add = pool->options.max_size - current_size; + } + } + + for (i = 0; i < num_to_add; i++) { + struct taskpool_taskprocessor *new_taskprocessor; + + new_taskprocessor = taskpool_taskprocessor_alloc(pool, 'd'); + if (!new_taskprocessor) { + return; + } + + if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, new_taskprocessor)) { + ao2_ref(new_taskprocessor, -1); + return; + } + + if (i == 0) { + /* On the first iteration we return the taskprocessor we just added */ + *taskprocessor = new_taskprocessor; + /* We assume we will be going back to the first taskprocessor, since we are at the end of the vector */ + pool->dynamic_taskprocessors.taskprocessor_num = 0; + } else if (i == 1) { + /* On the second iteration we update the next taskprocessor to use to be this one */ + pool->dynamic_taskprocessors.taskprocessor_num = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) - 1; + } + } +} + +int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data) +{ + RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup); + + /* Select the taskprocessor in the pool to use for pushing this task */ + ao2_lock(pool); + if (!pool->shutting_down) { + unsigned int growth_threshold_reached = 0; + + /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better + * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor + * fails for some reason, it will still fall back to the initially found static one if + * it is present. + */ + pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached); + if (pool->options.auto_increment && growth_threshold_reached) { + /* If we need to grow then try dynamic taskprocessors */ + pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached); + if (growth_threshold_reached) { + /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */ + taskpool_dynamic_pool_grow(pool, &taskprocessor); + } + + /* If a dynamic taskprocessor was used update its last push time */ + if (taskprocessor) { + taskprocessor->last_pushed = ast_tvnow(); + } + } + ao2_bump(taskprocessor); + } + ao2_unlock(pool); + + if (!taskprocessor) { + return -1; + } + + if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) { + return -1; + } + + return 0; +} + +/*! + * \internal Structure used for synchronous task + */ +struct taskpool_sync_task { + ast_mutex_t lock; + ast_cond_t cond; + int complete; + int fail; + int (*task)(void *); + void *task_data; +}; + +/*! + * \internal Initialization function for synchronous task + */ +static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int (*task)(void *), void *data) +{ + ast_mutex_init(&sync_task->lock); + ast_cond_init(&sync_task->cond, NULL); + sync_task->complete = 0; + sync_task->fail = 0; + sync_task->task = task; + sync_task->task_data = data; + return 0; +} + +/*! + * \internal Cleanup function for synchronous task + */ +static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task) +{ + ast_mutex_destroy(&sync_task->lock); + ast_cond_destroy(&sync_task->cond); +} + +/*! + * \internal Function for executing a sychronous task + */ +static int taskpool_sync_task(void *data) +{ + struct taskpool_sync_task *sync_task = data; + int ret; + + sync_task->fail = sync_task->task(sync_task->task_data); + + /* + * Once we unlock sync_task->lock after signaling, we cannot access + * sync_task again. The thread waiting within ast_taskpool_push_wait() + * is free to continue and release its local variable (sync_task). + */ + ast_mutex_lock(&sync_task->lock); + sync_task->complete = 1; + ast_cond_signal(&sync_task->cond); + ret = sync_task->fail; + ast_mutex_unlock(&sync_task->lock); + return ret; +} + +int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data) +{ + struct taskpool_sync_task sync_task; + + /* If we are already executing within a taskpool taskprocessor then + * don't bother pushing a new task, just directly execute the task. + */ + if (ast_taskpool_get_current()) { + return task(data); + } + + if (taskpool_sync_task_init(&sync_task, task, data)) { + return -1; + } + + if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) { + taskpool_sync_task_cleanup(&sync_task); + return -1; + } + + ast_mutex_lock(&sync_task.lock); + while (!sync_task.complete) { + ast_cond_wait(&sync_task.cond, &sync_task.lock); + } + ast_mutex_unlock(&sync_task.lock); + + taskpool_sync_task_cleanup(&sync_task); + return sync_task.fail; +} + +void ast_taskpool_shutdown(struct ast_taskpool *pool) +{ + if (!pool) { + return; + } + + /* Mark this pool as shutting down so nothing new is pushed */ + ao2_lock(pool); + pool->shutting_down = 1; + ao2_unlock(pool); + + /* Stop the shrink scheduled item if present */ + AST_SCHED_DEL_UNREF(sched, pool->shrink_sched_id, ao2_ref(pool, -1)); + + /* Clean up all the taskprocessors */ + taskpool_taskprocessors_cleanup(&pool->static_taskprocessors); + taskpool_taskprocessors_cleanup(&pool->dynamic_taskprocessors); + + ao2_ref(pool, -1); +} + +struct serializer { + /*! Taskpool the serializer will use to process the jobs. */ + struct ast_taskpool *pool; + /*! Which group will wait for this serializer to shutdown. */ + struct ast_serializer_shutdown_group *shutdown_group; +}; + +static void serializer_dtor(void *obj) +{ + struct serializer *ser = obj; + + ao2_cleanup(ser->pool); + ser->pool = NULL; + ao2_cleanup(ser->shutdown_group); + ser->shutdown_group = NULL; +} + +static struct serializer *serializer_create(struct ast_taskpool *pool, + struct ast_serializer_shutdown_group *shutdown_group) +{ + struct serializer *ser; + + /* This object has a lock so it can be used to ensure exclusive access + * to the execution of tasks within the serializer. + */ + ser = ao2_alloc(sizeof(*ser), serializer_dtor); + if (!ser) { + return NULL; + } + ser->pool = ao2_bump(pool); + ser->shutdown_group = ao2_bump(shutdown_group); + return ser; +} + +AST_THREADSTORAGE_RAW(current_taskpool_serializer); + +static int execute_tasks(void *data) +{ + struct ast_taskpool *pool = ast_taskpool_get_current(); + struct ast_taskprocessor *tps = data; + struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(tps); + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + size_t remaining, requeue = 0; + + /* In a normal scenario this lock will not be in contention with + * anything else. It is only if a synchronous task is pushed to + * the serializer that it may be blocked on the synchronous + * task thread. This is done to ensure that only one thread is executing + * tasks from the serializer at a given time, and not out of order + * either. + */ + ao2_lock(ser); + + ast_threadstorage_set_ptr(¤t_taskpool_serializer, tps); + for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) { + requeue = ast_taskprocessor_execute(tps); + } + ast_threadstorage_set_ptr(¤t_taskpool_serializer, NULL); + + ao2_unlock(ser); + + /* If there are remaining tasks we requeue, this way the serializer + * does not hold exclusivity of the taskpool taskprocessor + */ + if (requeue) { + /* Ownership passes to the new task */ + if (ast_taskpool_push(pool, execute_tasks, tps)) { + ast_taskprocessor_unreference(tps); + } + } else { + ast_taskprocessor_unreference(tps); + } + + return 0; +} + +static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) +{ + if (was_empty) { + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener); + + if (ast_taskpool_push(ser->pool, execute_tasks, tps)) { + ast_taskprocessor_unreference(tps); + } + } +} + +static int serializer_start(struct ast_taskprocessor_listener *listener) +{ + /* No-op */ + return 0; +} + +static void serializer_shutdown(struct ast_taskprocessor_listener *listener) +{ + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + + if (ser->shutdown_group) { + ast_serializer_shutdown_group_dec(ser->shutdown_group); + } + ao2_cleanup(ser); +} + +static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = { + .task_pushed = serializer_task_pushed, + .start = serializer_start, + .shutdown = serializer_shutdown, +}; + +struct ast_taskprocessor *ast_taskpool_serializer_get_current(void) +{ + return ast_threadstorage_get_ptr(¤t_taskpool_serializer); +} + +struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name, + struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group) +{ + struct serializer *ser; + struct ast_taskprocessor_listener *listener; + struct ast_taskprocessor *tps; + + ser = serializer_create(pool, shutdown_group); + if (!ser) { + return NULL; + } + + listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser); + if (!listener) { + ao2_ref(ser, -1); + return NULL; + } + + tps = ast_taskprocessor_create_with_listener(name, listener); + if (!tps) { + /* ser ref transferred to listener but not cleaned without tps */ + ao2_ref(ser, -1); + } else if (shutdown_group) { + ast_serializer_shutdown_group_inc(shutdown_group); + } + + ao2_ref(listener, -1); + return tps; +} + +struct ast_taskprocessor *ast_taskpool_serializer(const char *name, struct ast_taskpool *pool) +{ + return ast_taskpool_serializer_group(name, pool, NULL); +} + +/*! + * \internal An empty task callback, used to ensure the serializer does not + * go empty. */ +static int taskpool_serializer_empty_task(void *data) +{ + return 0; +} + +int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data) +{ + struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer); + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + struct ast_taskprocessor *prior_serializer; + struct taskpool_sync_task sync_task; + + /* If not in a taskpool taskprocessor we can just queue the task like normal and + * wait. */ + if (!ast_taskpool_get_current()) { + if (taskpool_sync_task_init(&sync_task, task, data)) { + return -1; + } + + if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) { + taskpool_sync_task_cleanup(&sync_task); + return -1; + } + + ast_mutex_lock(&sync_task.lock); + while (!sync_task.complete) { + ast_cond_wait(&sync_task.cond, &sync_task.lock); + } + ast_mutex_unlock(&sync_task.lock); + + taskpool_sync_task_cleanup(&sync_task); + return sync_task.fail; + } + + /* It is possible that we are already executing within a serializer, so stash the existing + * away so we can restore it. + */ + prior_serializer = ast_taskpool_serializer_get_current(); + + ao2_lock(ser); + + /* There are two cases where we can or have to directly execute this task: + * 1. There are no other tasks in the serializer + * 2. We are already in the serializer + * In the second case if we don't execute the task now, we will deadlock waiting + * on it as it will never occur. + */ + if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) { + ast_threadstorage_set_ptr(¤t_taskpool_serializer, serializer); + sync_task.fail = task(data); + ao2_unlock(ser); + ast_threadstorage_set_ptr(¤t_taskpool_serializer, prior_serializer); + return sync_task.fail; + } + + if (taskpool_sync_task_init(&sync_task, task, data)) { + ao2_unlock(ser); + return -1; + } + + /* First we queue the serialized task */ + if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) { + taskpool_sync_task_cleanup(&sync_task); + ao2_unlock(ser); + return -1; + } + + /* Next we queue the empty task to ensure the serializer doesn't reach empty, this + * stops two tasks from being queued for the same serializer at the same time. + */ + if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) { + taskpool_sync_task_cleanup(&sync_task); + ao2_unlock(ser); + return -1; + } + + /* Now we execute the tasks on the serializer until our sync task is complete */ + ast_threadstorage_set_ptr(¤t_taskpool_serializer, serializer); + while (!sync_task.complete) { + /* The sync task is guaranteed to be executed, so doing a while loop on the complete + * flag is safe. + */ + ast_taskprocessor_execute(serializer); + } + taskpool_sync_task_cleanup(&sync_task); + ao2_unlock(ser); + + ast_threadstorage_set_ptr(¤t_taskpool_serializer, prior_serializer); + + return sync_task.fail; +} + +/*! + * \internal + * \brief Clean up resources on Asterisk shutdown + */ +static void taskpool_shutdown(void) +{ + if (sched) { + ast_sched_context_destroy(sched); + sched = NULL; + } +} + +int ast_taskpool_init(void) +{ + sched = ast_sched_context_create(); + if (!sched) { + return -1; + } + + if (ast_sched_start_thread(sched)) { + return -1; + } + + ast_register_cleanup(taskpool_shutdown); + + return 0; +} diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 3c83a6883e..c4015279c5 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -967,6 +967,11 @@ long ast_taskprocessor_size(struct ast_taskprocessor *tps) return (tps) ? tps->tps_queue_size : -1; } +struct ast_taskprocessor_listener *ast_taskprocessor_listener(struct ast_taskprocessor *tps) +{ + return tps ? tps->listener : NULL; +} + /* taskprocessor name accessor */ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) { diff --git a/main/threadpool.c b/main/threadpool.c index a573670ece..0969e627c3 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1211,103 +1211,6 @@ static int worker_set_state(struct worker_thread *worker, enum worker_state stat return 0; } -/*! Serializer group shutdown control object. */ -struct ast_serializer_shutdown_group { - /*! Shutdown thread waits on this conditional. */ - ast_cond_t cond; - /*! Count of serializers needing to shutdown. */ - int count; -}; - -static void serializer_shutdown_group_dtor(void *vdoomed) -{ - struct ast_serializer_shutdown_group *doomed = vdoomed; - - ast_cond_destroy(&doomed->cond); -} - -struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void) -{ - struct ast_serializer_shutdown_group *shutdown_group; - - shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor); - if (!shutdown_group) { - return NULL; - } - ast_cond_init(&shutdown_group->cond, NULL); - return shutdown_group; -} - -int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout) -{ - int remaining; - ast_mutex_t *lock; - - if (!shutdown_group) { - return 0; - } - - lock = ao2_object_get_lockaddr(shutdown_group); - ast_assert(lock != NULL); - - ao2_lock(shutdown_group); - if (timeout) { - struct timeval start; - struct timespec end; - - start = ast_tvnow(); - end.tv_sec = start.tv_sec + timeout; - end.tv_nsec = start.tv_usec * 1000; - while (shutdown_group->count) { - if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) { - /* Error or timed out waiting for the count to reach zero. */ - break; - } - } - } else { - while (shutdown_group->count) { - if (ast_cond_wait(&shutdown_group->cond, lock)) { - /* Error */ - break; - } - } - } - remaining = shutdown_group->count; - ao2_unlock(shutdown_group); - return remaining; -} - -/*! - * \internal - * \brief Increment the number of serializer members in the group. - * \since 13.5.0 - * - * \param shutdown_group Group shutdown controller. - */ -static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group) -{ - ao2_lock(shutdown_group); - ++shutdown_group->count; - ao2_unlock(shutdown_group); -} - -/*! - * \internal - * \brief Decrement the number of serializer members in the group. - * \since 13.5.0 - * - * \param shutdown_group Group shutdown controller. - */ -static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group) -{ - ao2_lock(shutdown_group); - --shutdown_group->count; - if (!shutdown_group->count) { - ast_cond_signal(&shutdown_group->cond); - } - ao2_unlock(shutdown_group); -} - struct serializer { /*! Threadpool the serializer will use to process the jobs. */ struct ast_threadpool *pool; @@ -1379,7 +1282,7 @@ static void serializer_shutdown(struct ast_taskprocessor_listener *listener) struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); if (ser->shutdown_group) { - serializer_shutdown_group_dec(ser->shutdown_group); + ast_serializer_shutdown_group_dec(ser->shutdown_group); } ao2_cleanup(ser); } @@ -1418,7 +1321,7 @@ struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name, /* ser ref transferred to listener but not cleaned without tps */ ao2_ref(ser, -1); } else if (shutdown_group) { - serializer_shutdown_group_inc(shutdown_group); + ast_serializer_shutdown_group_inc(shutdown_group); } ao2_ref(listener, -1); diff --git a/tests/test_taskpool.c b/tests/test_taskpool.c new file mode 100644 index 0000000000..ddfb40e0ac --- /dev/null +++ b/tests/test_taskpool.c @@ -0,0 +1,810 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2025, Sangoma Technologies Inc + * + * Joshua Colp + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \brief taskpool unit tests + * + * \author Joshua Colp + * + */ + +/*** MODULEINFO + TEST_FRAMEWORK + core + ***/ + +#include "asterisk.h" + +#include "asterisk/astobj2.h" +#include "asterisk/lock.h" +#include "asterisk/logger.h" +#include "asterisk/module.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/test.h" +#include "asterisk/taskpool.h" +#include "asterisk/cli.h" + +struct test_data { + ast_mutex_t lock; + ast_cond_t cond; + int executed; + struct ast_taskprocessor *taskprocessor; +}; + +static struct test_data *test_alloc(void) +{ + struct test_data *td = ast_calloc(1, sizeof(*td)); + if (!td) { + return NULL; + } + ast_mutex_init(&td->lock); + ast_cond_init(&td->cond, NULL); + return td; +} + +static void test_destroy(struct test_data *td) +{ + ast_mutex_destroy(&td->lock); + ast_cond_destroy(&td->cond); + ast_free(td); +} + +static int simple_task(void *data) +{ + struct test_data *td = data; + SCOPED_MUTEX(lock, &td->lock); + td->taskprocessor = ast_taskpool_serializer_get_current(); + td->executed = 1; + ast_cond_signal(&td->cond); + return 0; +} + +AST_TEST_DEFINE(taskpool_push) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct timeval start; + struct timespec end; + + switch (cmd) { + case TEST_INIT: + info->name = "push"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool pushing test"; + info->description = + "Pushes a single task into a taskpool asynchronously and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_push(pool, simple_task, td)) { + goto end; + } + + /* It should not take more than 5 seconds for a single simple task to execute */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 5; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_synchronous) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + + switch (cmd) { + case TEST_INIT: + info->name = "push_synchronous"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool synchronous pushing test"; + info->description = + "Pushes a single task into a taskpool synchronously and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_push_wait(pool, simple_task, td)) { + goto end; + } + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_serializer) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct ast_taskprocessor *serializer = NULL; + struct timeval start; + struct timespec end; + + switch (cmd) { + case TEST_INIT: + info->name = "push_serializer"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool serializer pushing test"; + info->description = + "Pushes a single task into a taskpool serializer and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + serializer = ast_taskpool_serializer("serializer", pool); + if (!serializer) { + goto end; + } + + if (ast_taskprocessor_push(serializer, simple_task, td)) { + goto end; + } + + /* It should not take more than 5 seconds for a single simple task to execute */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 5; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (td->taskprocessor != serializer) { + ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskprocessor_unreference(serializer); + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_serializer_synchronous) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct ast_taskprocessor *serializer = NULL; + + switch (cmd) { + case TEST_INIT: + info->name = "push_serializer_synchronous"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool serializer synchronous pushing test"; + info->description = + "Pushes a single task into a taskpool serializer synchronously and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + serializer = ast_taskpool_serializer("serializer", pool); + if (!serializer) { + goto end; + } + + if (ast_taskpool_serializer_push_wait(serializer, simple_task, td)) { + goto end; + } + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (td->taskprocessor != serializer) { + ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskprocessor_unreference(serializer); + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +static int requeue_task(void *data) +{ + return ast_taskpool_serializer_push_wait(ast_taskpool_serializer_get_current(), simple_task, data); +} + +AST_TEST_DEFINE(taskpool_push_serializer_synchronous_requeue) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct ast_taskprocessor *serializer = NULL; + + switch (cmd) { + case TEST_INIT: + info->name = "push_serializer_synchronous_requeue"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool serializer synchronous requeueing test"; + info->description = + "Pushes a single task into a taskpool serializer synchronously and ensures it is requeued and executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + serializer = ast_taskpool_serializer("serializer", pool); + if (!serializer) { + goto end; + } + + if (ast_taskpool_serializer_push_wait(serializer, requeue_task, td)) { + goto end; + } + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (td->taskprocessor != serializer) { + ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskprocessor_unreference(serializer); + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_grow) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 1, + .minimum_size = 0, + .initial_size = 0, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct timeval start; + struct timespec end; + + switch (cmd) { + case TEST_INIT: + info->name = "push_grow"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool pushing test with auto-grow enabled"; + info->description = + "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool grows."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 0) { + ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + if (ast_taskpool_push(pool, simple_task, td)) { + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 1) { + ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + /* It should not take more than 5 seconds for a single simple task to execute */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 5; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_shrink) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 1, + .auto_increment = 1, + .minimum_size = 0, + .initial_size = 0, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct timeval start; + struct timespec end; + int iterations = 0; + + switch (cmd) { + case TEST_INIT: + info->name = "push_shrink"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool pushing test with auto-shrink enabled"; + info->description = + "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool shrinks."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 0) { + ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + if (ast_taskpool_push(pool, simple_task, td)) { + res = AST_TEST_FAIL; + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 1) { + ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + /* We give 10 seconds for the pool to shrink back to normal, but if it happens earlier we + * stop our check early. + */ + ast_mutex_lock(&td->lock); + do { + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 1; + end.tv_nsec = start.tv_usec * 1000; + + if (ast_cond_timedwait(&td->cond, &td->lock, &end) == ETIMEDOUT) { + iterations++; + } + } while (ast_taskpool_taskprocessors_count(pool) != 0 && iterations != 10); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (ast_taskpool_taskprocessors_count(pool) != 0) { + ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +struct efficiency_task_data { + struct ast_taskpool *pool; + int num_tasks_executed; + int shutdown; +}; + +static int efficiency_task(void *data) +{ + struct efficiency_task_data *etd = data; + + if (etd->shutdown) { + ao2_ref(etd->pool, -1); + return 0; + } + + ast_atomic_fetchadd_int(&etd->num_tasks_executed, +1); + + if (ast_taskpool_push(etd->pool, efficiency_task, etd)) { + ao2_ref(etd->pool, -1); + return -1; + } + + return 0; +} + +static char *handle_cli_taskpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 5, + .initial_size = 5, + .max_size = 5, + }; + struct efficiency_task_data etd = { + .pool = NULL, + .num_tasks_executed = 0, + .shutdown = 0, + }; + struct timeval start; + struct timespec end; + int i; + + switch (cmd) { + case CLI_INIT: + e->command = "taskpool push efficiency"; + e->usage = + "Usage: taskpool push efficiency\n" + " Pushes 200 tasks to a taskpool and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + td = test_alloc(); + if (!td) { + return CLI_SUCCESS; + } + + pool = ast_taskpool_create("taskpool_push_efficiency", &options); + if (!pool) { + goto end; + } + + etd.pool = pool; + + /* Push in 200 tasks, cause why not */ + for (i = 0; i < 200; i++) { + /* Ensure that the task has a reference to the pool */ + ao2_bump(pool); + if (ast_taskpool_push(pool, efficiency_task, &etd)) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", etd.num_tasks_executed); + +end: + etd.shutdown = 1; + ast_taskpool_shutdown(pool); + test_destroy(td); + return CLI_SUCCESS; +} + +struct serializer_efficiency_task_data { + struct ast_taskprocessor *serializer[2]; + int *num_tasks_executed; + int *shutdown; +}; + +static int serializer_efficiency_task(void *data) +{ + struct serializer_efficiency_task_data *etd = data; + struct ast_taskprocessor *taskprocessor = etd->serializer[0]; + + if (*etd->shutdown) { + return 0; + } + + ast_atomic_fetchadd_int(etd->num_tasks_executed, +1); + + /* We ping pong a task between a pair of taskprocessors to ensure that + * a single taskprocessor does not receive a thread from the threadpool + * exclusively. + */ + if (taskprocessor == ast_taskpool_serializer_get_current()) { + taskprocessor = etd->serializer[1]; + } + + if (ast_taskprocessor_push(taskprocessor, + serializer_efficiency_task, etd)) { + return -1; + } + + return 0; +} + +static char *handle_cli_taskpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 5, + .initial_size = 5, + .max_size = 5, + }; + struct serializer_efficiency_task_data etd[200]; + struct timeval start; + struct timespec end; + int i; + int num_tasks_executed = 0; + int shutdown = 0; + + switch (cmd) { + case CLI_INIT: + e->command = "taskpool push serializer efficiency"; + e->usage = + "Usage: taskpool push serializer efficiency\n" + " Pushes 200 tasks to a taskpool in serializers and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + td = test_alloc(); + if (!td) { + return CLI_SUCCESS; + } + + memset(&etd, 0, sizeof(etd)); + + pool = ast_taskpool_create("taskpool_push_serializer_efficiency", &options); + if (!pool) { + goto end; + } + + /* We create 400 (200 pairs) of serializers */ + for (i = 0; i < 200; i++) { + char serializer_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[0] = ast_taskpool_serializer(serializer_name, pool); + if (!etd[i].serializer[0]) { + goto end; + } + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[1] = ast_taskpool_serializer(serializer_name, pool); + if (!etd[i].serializer[1]) { + goto end; + } + + etd[i].num_tasks_executed = &num_tasks_executed; + etd[i].shutdown = &shutdown; + } + + /* And once created we push in 200 tasks */ + for (i = 0; i < 200; i++) { + if (ast_taskprocessor_push(etd[i].serializer[0], serializer_efficiency_task, &etd[i])) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", num_tasks_executed); + shutdown = 1; + +end: + /* We need to unreference each serializer */ + for (i = 0; i < 200; i++) { + ast_taskprocessor_unreference(etd[i].serializer[0]); + ast_taskprocessor_unreference(etd[i].serializer[1]); + } + ast_taskpool_shutdown(pool); + test_destroy(td); + return CLI_SUCCESS; +} + +static struct ast_cli_entry cli[] = { + AST_CLI_DEFINE(handle_cli_taskpool_push_efficiency, "Push tasks to a taskpool and measure efficiency"), + AST_CLI_DEFINE(handle_cli_taskpool_push_serializer_efficiency, "Push tasks to a taskpool in serializers and measure efficiency"), +}; + +static int unload_module(void) +{ + ast_cli_unregister_multiple(cli, ARRAY_LEN(cli)); + AST_TEST_UNREGISTER(taskpool_push); + AST_TEST_UNREGISTER(taskpool_push_synchronous); + AST_TEST_UNREGISTER(taskpool_push_serializer); + AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous); + AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous_requeue); + AST_TEST_UNREGISTER(taskpool_push_grow); + AST_TEST_UNREGISTER(taskpool_push_shrink); + return 0; +} + +static int load_module(void) +{ + ast_cli_register_multiple(cli, ARRAY_LEN(cli)); + AST_TEST_REGISTER(taskpool_push); + AST_TEST_REGISTER(taskpool_push_synchronous); + AST_TEST_REGISTER(taskpool_push_serializer); + AST_TEST_REGISTER(taskpool_push_serializer_synchronous); + AST_TEST_REGISTER(taskpool_push_serializer_synchronous_requeue); + AST_TEST_REGISTER(taskpool_push_grow); + AST_TEST_REGISTER(taskpool_push_shrink); + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskpool test module"); diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 3fb4430d98..c2af88e8e6 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -231,7 +231,9 @@ static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, s } if (!tld->empty_notice) { - ast_test_status_update(test, "Test listener not notified that threadpool is empty\n"); + if (test) { + ast_test_status_update(test, "Test listener not notified that threadpool is empty\n"); + } res = AST_TEST_FAIL; } @@ -347,6 +349,113 @@ end: return res; } +struct efficiency_task_data { + struct ast_threadpool *pool; + int num_tasks_executed; + int shutdown; +}; + +static int efficiency_task(void *data) +{ + struct efficiency_task_data *etd = data; + + if (etd->shutdown) { + return 0; + } + + ast_atomic_fetchadd_int(&etd->num_tasks_executed, +1); + + if (ast_threadpool_push(etd->pool, efficiency_task, etd)) { + return -1; + } + + return 0; +} + +static char *handle_cli_threadpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct test_listener_data *tld = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 5, + .max_size = 5, + }; + struct efficiency_task_data etd = { + .pool = NULL, + .num_tasks_executed = 0, + .shutdown = 0, + }; + struct timeval start; + struct timespec end; + int i; + + switch (cmd) { + case CLI_INIT: + e->command = "threadpool push efficiency"; + e->usage = + "Usage: threadpool push efficiency\n" + " Pushes 200 tasks to a threadpool and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + tld = test_alloc(); + if (!tld) { + return CLI_SUCCESS; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create("threadpool_push_efficiency", listener, &options); + if (!pool) { + goto end; + } + + etd.pool = pool; + + /* Push in 200 tasks, cause why not */ + for (i = 0; i < 200; i++) { + if (ast_threadpool_push(pool, efficiency_task, &etd)) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&tld->lock); + while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&tld->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", etd.num_tasks_executed); + etd.shutdown = 1; + + res = wait_for_empty_notice(NULL, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return CLI_SUCCESS; +} + AST_TEST_DEFINE(threadpool_initial_threads) { struct ast_threadpool *pool = NULL; @@ -1744,6 +1853,147 @@ end: return res; } +struct serializer_efficiency_task_data { + struct ast_taskprocessor *serializer[2]; + int *num_tasks_executed; + int *shutdown; +}; + +static int serializer_efficiency_task(void *data) +{ + struct serializer_efficiency_task_data *etd = data; + struct ast_taskprocessor *taskprocessor = etd->serializer[0]; + + if (*etd->shutdown) { + return 0; + } + + ast_atomic_fetchadd_int(etd->num_tasks_executed, +1); + + /* We ping pong a task between a pair of taskprocessors to ensure that + * a single taskprocessor does not receive a thread from the threadpool + * exclusively. + */ + if (taskprocessor == ast_threadpool_serializer_get_current()) { + taskprocessor = etd->serializer[1]; + } + + if (ast_taskprocessor_push(taskprocessor, + serializer_efficiency_task, etd)) { + return -1; + } + + return 0; +} + +static char *handle_cli_threadpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd, + struct ast_cli_args *a) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct test_listener_data *tld = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 5, + .max_size = 5, + }; + struct serializer_efficiency_task_data etd[200]; + struct timeval start; + struct timespec end; + int i; + int num_tasks_executed = 0; + int shutdown = 0; + + switch (cmd) { + case CLI_INIT: + e->command = "threadpool push serializer efficiency"; + e->usage = + "Usage: threadpool push serializer efficiency\n" + " Pushes 200 tasks to a threadpool in serializers and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + tld = test_alloc(); + if (!tld) { + return CLI_SUCCESS; + } + + memset(&etd, 0, sizeof(etd)); + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create("threadpool_push_serializer_efficiency", listener, &options); + if (!pool) { + goto end; + } + + /* We create 400 (200 pairs) of serializers */ + for (i = 0; i < 200; i++) { + char serializer_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[0] = ast_threadpool_serializer(serializer_name, pool); + if (!etd[i].serializer[0]) { + goto end; + } + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[1] = ast_threadpool_serializer(serializer_name, pool); + if (!etd[i].serializer[1]) { + goto end; + } + + etd[i].num_tasks_executed = &num_tasks_executed; + etd[i].shutdown = &shutdown; + } + + /* And once created we push in 200 tasks */ + for (i = 0; i < 200; i++) { + if (ast_taskprocessor_push(etd[i].serializer[0], serializer_efficiency_task, &etd[i])) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&tld->lock); + while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&tld->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", num_tasks_executed); + shutdown = 1; + + res = wait_for_empty_notice(NULL, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + +end: + /* We need to unreference each serializer */ + for (i = 0; i < 200; i++) { + ast_taskprocessor_unreference(etd[i].serializer[0]); + ast_taskprocessor_unreference(etd[i].serializer[1]); + } + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return CLI_SUCCESS; +} + AST_TEST_DEFINE(threadpool_serializer_dupe) { enum ast_test_result_state res = AST_TEST_FAIL; @@ -1798,6 +2048,11 @@ end: return res; } +static struct ast_cli_entry cli[] = { + AST_CLI_DEFINE(handle_cli_threadpool_push_efficiency, "Push tasks to a threadpool and measure efficiency"), + AST_CLI_DEFINE(handle_cli_threadpool_push_serializer_efficiency, "Push tasks to a threadpool in serializers and measure efficiency"), +}; + static int unload_module(void) { ast_test_unregister(threadpool_push); @@ -1816,11 +2071,13 @@ static int unload_module(void) ast_test_unregister(threadpool_more_destruction); ast_test_unregister(threadpool_serializer); ast_test_unregister(threadpool_serializer_dupe); + ast_cli_unregister_multiple(cli, ARRAY_LEN(cli)); return 0; } static int load_module(void) { + ast_cli_register_multiple(cli, ARRAY_LEN(cli)); ast_test_register(threadpool_push); ast_test_register(threadpool_initial_threads); ast_test_register(threadpool_thread_creation);