diff --git a/configs/samples/stasis.conf.sample b/configs/samples/stasis.conf.sample index b62d1c60ae..24e5619f02 100644 --- a/configs/samples/stasis.conf.sample +++ b/configs/samples/stasis.conf.sample @@ -1,12 +1,13 @@ -[threadpool] -;initial_size = 5 ; Initial size of the threadpool. -; ; 0 means the threadpool has no threads initially -; ; until a task needs a thread. -;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before -; ; dying. 0 means threads never time out. -;max_size = 50 ; Maximum number of threads in the Stasis threadpool. -; ; 0 means no limit to the number of threads in the -; ; threadpool. +[taskpool] +;minimum_size = 5 ; Minimum number of taskprocessors in taskpool. +;initial_size = 5 ; Initial size of the taskpool. +; ; 0 means the taskpool has no taskprocessors initially +; ; until a task needs a taskprocessor. +;idle_timeout_sec = 20 ; Number of seconds a taskprocessor should be idle before +; ; dying. 0 means taskprocessors never time out. +;max_size = 50 ; Maximum number of taskprocessors in the Stasis taskpool. +; ; 0 means no limit to the number of taskprocessors in the +; ; taskpool. [declined_message_types] ; This config section contains the names of message types that should be prevented diff --git a/include/asterisk/_private.h b/include/asterisk/_private.h index 684452fee0..056c0fde19 100644 --- a/include/asterisk/_private.h +++ b/include/asterisk/_private.h @@ -48,6 +48,7 @@ int ast_named_locks_init(void); /*!< Provided by named_locks.c */ int ast_file_init(void); /*!< Provided by file.c */ void ast_autoservice_init(void); /*!< Provided by autoservice.c */ int ast_tps_init(void); /*!< Provided by taskprocessor.c */ +int ast_taskpool_init(void); /*!< Provided by taskpool.c */ int ast_timing_init(void); /*!< Provided by timing.c */ void ast_stun_init(void); /*!< Provided by stun.c */ int ast_ssl_init(void); /*!< Provided by ssl.c */ diff --git a/include/asterisk/serializer.h b/include/asterisk/serializer.h index 112a0bea38..e2e07eefe6 100644 --- a/include/asterisk/serializer.h +++ b/include/asterisk/serializer.h @@ -20,6 +20,7 @@ #define _AST_SERIALIZER_H struct ast_threadpool; +struct ast_taskpool; /*! * Maintains a named pool of thread pooled taskprocessors. Also if configured @@ -56,6 +57,26 @@ int ast_serializer_pool_destroy(struct ast_serializer_pool *pool); struct ast_serializer_pool *ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout); +/*! + * \brief Create a serializer pool on taskpool. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * Create a serializer pool with an optional shutdown group. If a timeout greater + * than -1 is specified then a shutdown group is enabled on the pool. + * + * \param name The base name for the pool, and used when building taskprocessor(s) + * \param size The size of the pool + * \param taskpool The backing taskpool to use + * \param timeout The timeout used if using a shutdown group (-1 = disabled) + * + * \return A newly allocated serializer pool object + * \retval NULL on error + */ + struct ast_serializer_pool *ast_serializer_taskpool_create(const char *name, + unsigned int size, struct ast_taskpool *taskpool, int timeout); + /*! * \brief Retrieve the base name of the serializer pool. * diff --git a/include/asterisk/serializer_shutdown_group.h b/include/asterisk/serializer_shutdown_group.h new file mode 100644 index 0000000000..b4c4adc01e --- /dev/null +++ b/include/asterisk/serializer_shutdown_group.h @@ -0,0 +1,65 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#ifndef _ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H +#define _ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H + +struct ast_serializer_shutdown_group; + +/*! + * \brief Create a serializer group shutdown control object. + * \since 13.5.0 + * + * \return ao2 object to control shutdown of a serializer group. + */ +struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void); + +/*! + * \brief Wait for the serializers in the group to shutdown with timeout. + * \since 13.5.0 + * + * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL) + * \param timeout Number of seconds to wait for the serializers in the group to shutdown. + * Zero if the timeout is disabled. + * + * \return Number of serializers that did not get shutdown within the timeout. + */ +int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout); + +/*! + * \brief Increment the number of serializer members in the group. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param shutdown_group Group shutdown controller. + */ + void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group); + + /*! + * \brief Decrement the number of serializer members in the group. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param shutdown_group Group shutdown controller. + */ +void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group); + +#endif /* ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H */ diff --git a/include/asterisk/taskpool.h b/include/asterisk/taskpool.h new file mode 100644 index 0000000000..2a4f963052 --- /dev/null +++ b/include/asterisk/taskpool.h @@ -0,0 +1,321 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2025, Sangoma Technologies Corporation + * + * Joshua C. Colp + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * \ref Taskpool + * + * \page Taskpool API providing queued task execution across threads. + +The taskpool API is a specialized API for the queueing of tasks +in a synchronous or asynchronous manner, to be executed across +a pool of threads. For cases where serialization is needed a +serializer API is also provided ensuring that tasks queued to +the serializer are executed in a serialized fashion within the +taskpool. + +On creation of a taskpool various options can be set and used to +control the operation of the pool. This includes how many taskprocessors +are present, whether the pool can grow, whether the pool can shrink, +and how long idle taskprocessors should exist before being terminated. +This provides flexibility based on the specific needs of the user of +the taskpool and the environment. + +The queueing of tasks to the taskpool is done using a selector. The +selector examines the available taskprocessors and decides which one +to queue the task to. This operation can also examine the state of +the pool to see if it needs to grow and if enabled and possible does so. + +The taskpool API is preferred for many cases over the use of the +threadpool due to the far lower overhead involved. Taskpools require +no additional thread or task queue for management of the pool itself and +the act of queueing tasks, the most common operation, is written to be as +simple and minimal as possible. Threadpools are best used for long +running tasks and operations. + +*/ + +#ifndef _ASTERISK_TASKPOOL_H +#define _ASTERISK_TASKPOOL_H + +struct ast_taskpool; +struct ast_taskprocessor; +struct ast_serializer_shutdown_group; + +/*! + * \brief Selectors for choosing which taskprocessor in a pool to use + */ +enum ast_taskpool_selector { + AST_TASKPOOL_SELECTOR_DEFAULT = 0, /* The selector that is generally the best for most use cases */ + AST_TASKPOOL_SELECTOR_LEAST_FULL, /* Select the least full taskprocessor */ + AST_TASKPOOL_SELECTOR_SEQUENTIAL, /* Select taskprocessors in a sequential manner */ +}; + +struct ast_taskpool_options { +#define AST_TASKPOOL_OPTIONS_VERSION 1 + /*! Version of taskpool options in use */ + int version; + /*! + * \brief The selector to use for choosing a taskprocessor + */ + enum ast_taskpool_selector selector; + /*! + * \brief Time limit in seconds for idle dynamic taskprocessors + * + * A time of 0 or less will mean no timeout. + */ + int idle_timeout; + /*! + * \brief Number of taskprocessors to increment the pool by + */ + int auto_increment; + /*! + * \brief Number of taskprocessors that will always exist + * + * Zero is a valid value if the taskpool will never have taskprocessors + * that always exist, allowing the pool to drop to zero if not used. + */ + int minimum_size; + /*! + * \brief Number of taskprocessors the pool will start with + * + * Zero is a valid value if the taskpool should start + * without any taskprocessors allocated. + * + * \note This must be equal to or greater than the minimum_size, + * otherwise the taskpool will adjust this to the minimum_size. + */ + int initial_size; + /*! + * \brief Maximum number of taskprocessors a pool may have + * + * When the taskpool's size increases, it can never increase + * beyond this number of taskprocessors. + * + * Zero is a valid value if the taskpool does not have a + * maximum size for taskprocessors. + * + * \note This must be equal to or greater than the initial_size, + * otherwise the taskpool will adjust this to the initial_size. + */ + int max_size; + /*! + * \brief The threshold for when to grow the pool + * + * This is the number of tasks that must be in queue before the pool will grow. + * + * \note If not specified a default of the 50% of the high water threshold defined + * in taskprocessor.h will be used. + */ + int growth_threshold; + /*! + * \brief Function to call when a taskprocessor starts + * + * This is useful if there is something common that all + * taskprocessors in a taskpool need to do when they start. + */ + void (*thread_start)(void); + /*! + * \brief Function to call when a taskprocessor ends + * + * This is useful if there is common cleanup to execute when + * a taskprocessor completes + */ + void (*thread_end)(void); +}; + +/*! + * \brief Create a new taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * This function creates a taskpool. Tasks may be pushed onto this task pool + * and will be automatically acted upon by taskprocessors within the pool. + * + * Only a single taskpool with a given name may exist. This function will fail + * if a taskpool with the given name already exists. + * + * \param name The unique name for the taskpool + * \param options The behavioral options for this taskpool + * \retval NULL Failed to create the taskpool + * \retval non-NULL The newly-created taskpool + * + * \note The \ref ast_taskpool_shutdown function must be called to shut down the + * taskpool and clean up underlying resources fully. + */ +struct ast_taskpool *ast_taskpool_create(const char *name, + const struct ast_taskpool_options *options); + +/*! + * \brief Get the current number of taskprocessors in the taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param pool The taskpool to query + * \retval The number of taskprocessors in the taskpool + */ +size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool); + +/*! + * \brief Get the current number of queued tasks in the taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param pool The taskpool to query + * \retval The number of queued tasks in the taskpool + */ +long ast_taskpool_queue_size(struct ast_taskpool *pool); + +/*! + * \brief Push a task to the taskpool + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * Tasks pushed into the taskpool will be automatically taken by + * one of the taskprocessors within + * \param pool The taskpool to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data) + attribute_warn_unused_result; + +/*! + * \brief Push a task to the taskpool, and wait for completion + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * Tasks pushed into the taskpool will be automatically taken by + * one of the taskprocessors within + * \param pool The taskpool to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data) + attribute_warn_unused_result; + +/*! + * \brief Shut down a taskpool and remove the underlying taskprocessors + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param pool The pool to shut down + * + * \note This will decrement the reference to the pool + */ +void ast_taskpool_shutdown(struct ast_taskpool *pool); + +/*! + * \brief Get the taskpool serializer currently associated with this thread. + * + * \note The returned pointer is valid while the serializer + * thread is running. + * + * \note Use ao2_ref() on serializer if you are going to keep it + * for another thread. To unref it you must then use + * ast_taskprocessor_unreference(). + * + * \retval serializer on success. + * \retval NULL on error or no serializer associated with the thread. + */ + struct ast_taskprocessor *ast_taskpool_serializer_get_current(void); + +/*! + * \brief Serialized execution of tasks within a \ref ast_taskpool. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * A \ref ast_taskprocessor with the same contract as a default taskprocessor + * (tasks execute serially) except instead of executing out of a dedicated + * thread, execution occurs in a taskprocessor from a \ref ast_taskpool. + * + * While it guarantees that each task will complete before executing the next, + * there is no guarantee as to which thread from the \c pool individual tasks + * will execute. This normally only matters if your code relies on thread + * specific information, such as thread locals. + * + * Use ast_taskprocessor_unreference() to dispose of the returned \ref + * ast_taskprocessor. + * + * Only a single taskprocessor with a given name may exist. This function will fail + * if a taskprocessor with the given name already exists. + * + * \param name Name of the serializer. (must be unique) + * \param pool \ref ast_taskpool for execution. + * + * \return \ref ast_taskprocessor for enqueuing work. + * \retval NULL on error. + */ +struct ast_taskprocessor *ast_taskpool_serializer(const char *name, struct ast_taskpool *pool); + +/*! + * \brief Serialized execution of tasks within a \ref ast_taskpool. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * A \ref ast_taskprocessor with the same contract as a default taskprocessor + * (tasks execute serially) except instead of executing out of a dedicated + * thread, execution occurs in a taskprocessor from a \ref ast_taskpool. + * + * While it guarantees that each task will complete before executing the next, + * there is no guarantee as to which thread from the \c pool individual tasks + * will execute. This normally only matters if your code relies on thread + * specific information, such as thread locals. + * + * Use ast_taskprocessor_unreference() to dispose of the returned \ref + * ast_taskprocessor. + * + * Only a single taskprocessor with a given name may exist. This function will fail + * if a taskprocessor with the given name already exists. + * + * \param name Name of the serializer. (must be unique) + * \param pool \ref ast_taskpool for execution. + * \param shutdown_group Group shutdown controller. (NULL if no group association) + * + * \return \ref ast_taskprocessor for enqueuing work. + * \retval NULL on error. + */ +struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name, + struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group); + +/*! + * \brief Push a task to a serializer, and wait for completion + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param serializer The serializer to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data); + +#endif /* ASTERISK_TASKPOOL_H */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 73c5c2ec99..3e3886eb1e 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -340,6 +340,11 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps); */ long ast_taskprocessor_size(struct ast_taskprocessor *tps); +/*! + * \brief Return the listener associated with the taskprocessor + */ +struct ast_taskprocessor_listener *ast_taskprocessor_listener(struct ast_taskprocessor *tps); + /*! * \brief Get the current taskprocessor high water alert count. * \since 13.10.0 diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index a72e4523a1..72b2863c5d 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -24,6 +24,8 @@ struct ast_threadpool; struct ast_taskprocessor; struct ast_threadpool_listener; +#include "asterisk/serializer_shutdown_group.h" + struct ast_threadpool_listener_callbacks { /*! * \brief Indicates that the state of threads in the pool has changed @@ -196,28 +198,6 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo */ void ast_threadpool_shutdown(struct ast_threadpool *pool); -struct ast_serializer_shutdown_group; - -/*! - * \brief Create a serializer group shutdown control object. - * \since 13.5.0 - * - * \return ao2 object to control shutdown of a serializer group. - */ -struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void); - -/*! - * \brief Wait for the serializers in the group to shutdown with timeout. - * \since 13.5.0 - * - * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL) - * \param timeout Number of seconds to wait for the serializers in the group to shutdown. - * Zero if the timeout is disabled. - * - * \return Number of serializers that did not get shutdown within the timeout. - */ -int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout); - /*! * \brief Get the threadpool serializer currently associated with this thread. * \since 14.0.0 diff --git a/main/asterisk.c b/main/asterisk.c index 76b927d3f2..5c38031cbe 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -4247,6 +4247,7 @@ static void asterisk_daemon(int isroot, const char *runuser, const char *rungrou check_init(ast_utils_init(), "Utilities"); check_init(ast_tps_init(), "Task Processor Core"); + check_init(ast_taskpool_init(), "Taskpool Support"); check_init(ast_fd_init(), "File Descriptor Debugging"); check_init(ast_pbx_init(), "ast_pbx_init"); check_init(aco_init(), "Configuration Option Framework"); diff --git a/main/serializer.c b/main/serializer.c index a10dc496d6..5fcb279332 100644 --- a/main/serializer.c +++ b/main/serializer.c @@ -22,8 +22,10 @@ #include "asterisk/serializer.h" #include "asterisk/taskprocessor.h" #include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #include "asterisk/utils.h" #include "asterisk/vector.h" +#include "asterisk/serializer_shutdown_group.h" struct ast_serializer_pool { /*! Shutdown group to monitor serializers. */ @@ -119,6 +121,52 @@ struct ast_serializer_pool *ast_serializer_pool_create(const char *name, return pool; } +struct ast_serializer_pool *ast_serializer_taskpool_create(const char *name, + unsigned int size, struct ast_taskpool *taskpool, int timeout) +{ + struct ast_serializer_pool *pool; + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + size_t idx; + + ast_assert(size > 0); + + pool = ast_malloc(sizeof(*pool) + strlen(name) + 1); + if (!pool) { + return NULL; + } + + strcpy(pool->name, name); /* safe */ + + pool->shutdown_group_timeout = timeout; + pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL; + + AST_VECTOR_RW_INIT(&pool->serializers, size); + + for (idx = 0; idx < size; ++idx) { + struct ast_taskprocessor *tps; + + /* Create name with seq number appended. */ + ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name); + + tps = ast_taskpool_serializer_group(tps_name, taskpool, pool->shutdown_group); + if (!tps) { + ast_serializer_pool_destroy(pool); + ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n", + tps_name); + return NULL; + } + + if (AST_VECTOR_APPEND(&pool->serializers, tps)) { + ast_serializer_pool_destroy(pool); + ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n", + tps_name); + return NULL; + } + } + + return pool; +} + const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool) { return pool->name; diff --git a/main/serializer_shutdown_group.c b/main/serializer_shutdown_group.c new file mode 100644 index 0000000000..f232d0f6f1 --- /dev/null +++ b/main/serializer_shutdown_group.c @@ -0,0 +1,108 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#include "asterisk.h" + +#include "asterisk/serializer_shutdown_group.h" +#include "asterisk/astobj2.h" +#include "asterisk/time.h" +#include "asterisk/utils.h" + +/*! Serializer group shutdown control object. */ +struct ast_serializer_shutdown_group { + /*! Shutdown thread waits on this conditional. */ + ast_cond_t cond; + /*! Count of serializers needing to shutdown. */ + int count; +}; + +static void serializer_shutdown_group_dtor(void *vdoomed) +{ + struct ast_serializer_shutdown_group *doomed = vdoomed; + + ast_cond_destroy(&doomed->cond); +} + +struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void) +{ + struct ast_serializer_shutdown_group *shutdown_group; + + shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor); + if (!shutdown_group) { + return NULL; + } + ast_cond_init(&shutdown_group->cond, NULL); + return shutdown_group; +} + +int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout) +{ + int remaining; + ast_mutex_t *lock; + + if (!shutdown_group) { + return 0; + } + + lock = ao2_object_get_lockaddr(shutdown_group); + ast_assert(lock != NULL); + + ao2_lock(shutdown_group); + if (timeout) { + struct timeval start; + struct timespec end; + + start = ast_tvnow(); + end.tv_sec = start.tv_sec + timeout; + end.tv_nsec = start.tv_usec * 1000; + while (shutdown_group->count) { + if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) { + /* Error or timed out waiting for the count to reach zero. */ + break; + } + } + } else { + while (shutdown_group->count) { + if (ast_cond_wait(&shutdown_group->cond, lock)) { + /* Error */ + break; + } + } + } + remaining = shutdown_group->count; + ao2_unlock(shutdown_group); + return remaining; +} + +void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group) +{ + ao2_lock(shutdown_group); + ++shutdown_group->count; + ao2_unlock(shutdown_group); +} + +void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group) +{ + ao2_lock(shutdown_group); + --shutdown_group->count; + if (!shutdown_group->count) { + ast_cond_signal(&shutdown_group->cond); + } + ao2_unlock(shutdown_group); +} diff --git a/main/stasis.c b/main/stasis.c index c85fb29c5a..f9d5fbdd64 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -33,7 +33,7 @@ #include "asterisk/stasis_internal.h" #include "asterisk/stasis.h" #include "asterisk/taskprocessor.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #include "asterisk/utils.h" #include "asterisk/uuid.h" #include "asterisk/vector.h" @@ -95,6 +95,36 @@ Maximum number of threads in the threadpool. + + + 24.0.0 + + Settings that configure the taskpool Stasis uses to deliver some messages. + + + 24.0.0 + + Minimum number of taskprocessors in the message bus taskpool. + + + + 24.0.0 + + Initial number of taskprocessors in the message bus taskpool. + + + + 24.0.0 + + Number of seconds before an idle taskprocessor is disposed of. + + + + 24.0.0 + + Maximum number of taskprocessors in the taskpool. + + 13.0.0 @@ -331,8 +361,8 @@ /*! The number of buckets to use for topic pools */ #define TOPIC_POOL_BUCKETS 57 -/*! Thread pool for topics that don't want a dedicated taskprocessor */ -static struct ast_threadpool *threadpool; +/*! Taskpool for topics that don't want a dedicated taskprocessor */ +static struct ast_taskpool *taskpool; STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); @@ -693,8 +723,8 @@ struct stasis_subscription_statistics { int messages_passed; /*! \brief Using a mailbox to queue messages */ int uses_mailbox; - /*! \brief Using stasis threadpool for handling messages */ - int uses_threadpool; + /*! \brief Using stasis taskpool for handling messages */ + int uses_taskpool; /*! \brief The line number where the subscription originates */ int lineno; /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */ @@ -847,7 +877,7 @@ static void subscription_statistics_destroy(void *obj) } static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub, - int needs_mailbox, int use_thread_pool, const char *file, int lineno, + int needs_mailbox, int use_taskpool, const char *file, int lineno, const char *func) { struct stasis_subscription_statistics *statistics; @@ -872,7 +902,7 @@ static struct stasis_subscription_statistics *stasis_subscription_statistics_cre statistics->lineno = lineno; statistics->func = func; statistics->uses_mailbox = needs_mailbox; - statistics->uses_threadpool = use_thread_pool; + statistics->uses_taskpool = use_taskpool; strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */ statistics->sub = sub; ao2_link(subscription_stats, statistics); @@ -886,7 +916,7 @@ struct stasis_subscription *internal_stasis_subscribe( stasis_subscription_cb callback, void *data, int needs_mailbox, - int use_thread_pool, + int use_taskpool, const char *file, int lineno, const char *func) @@ -906,7 +936,7 @@ struct stasis_subscription *internal_stasis_subscribe( #ifdef AST_DEVMODE ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1)); - sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func); + sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_taskpool, file, lineno, func); if (ret < 0 || !sub->statistics) { ao2_ref(sub, -1); return NULL; @@ -924,7 +954,7 @@ struct stasis_subscription *internal_stasis_subscribe( /* Create name with seq number appended. */ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s", - use_thread_pool ? 'p' : 'm', + use_taskpool ? 'p' : 'm', stasis_topic_name(topic)); /* @@ -932,8 +962,8 @@ struct stasis_subscription *internal_stasis_subscribe( * acceptable. For a large number of subscribers, a thread * pool should be used. */ - if (use_thread_pool) { - sub->mailbox = ast_threadpool_serializer(tps_name, threadpool); + if (use_taskpool) { + sub->mailbox = ast_taskpool_serializer(tps_name, taskpool); } else { sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT); } @@ -2210,19 +2240,21 @@ struct stasis_declined_config { struct ao2_container *declined; }; -/*! \brief Threadpool configuration options */ -struct stasis_threadpool_conf { - /*! Initial size of the thread pool */ +/*! \brief Taskpool configuration options */ +struct stasis_taskpool_conf { + /*! Minimum size of the taskpool */ + int minimum_size; + /*! Initial size of the taskpool */ int initial_size; - /*! Time, in seconds, before we expire a thread */ + /*! Time, in seconds, before we expire a taskprocessor */ int idle_timeout_sec; - /*! Maximum number of thread to allow */ + /*! Maximum number of taskprocessors to allow */ int max_size; }; struct stasis_config { - /*! Thread pool configuration options */ - struct stasis_threadpool_conf *threadpool_options; + /*! Taskpool configuration options */ + struct stasis_taskpool_conf *taskpool_options; /*! Declined message types */ struct stasis_declined_config *declined_message_types; }; @@ -2230,12 +2262,20 @@ struct stasis_config { static struct aco_type threadpool_option = { .type = ACO_GLOBAL, .name = "threadpool", - .item_offset = offsetof(struct stasis_config, threadpool_options), + .item_offset = offsetof(struct stasis_config, taskpool_options), .category = "threadpool", .category_match = ACO_WHITELIST_EXACT, }; -static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option); +static struct aco_type taskpool_option = { + .type = ACO_GLOBAL, + .name = "taskpool", + .item_offset = offsetof(struct stasis_config, taskpool_options), + .category = "taskpool", + .category_match = ACO_WHITELIST_EXACT, +}; + +static struct aco_type *taskpool_options[] = ACO_TYPES(&threadpool_option, &taskpool_option); /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */ static struct aco_type declined_option = { @@ -2250,7 +2290,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option); struct aco_file stasis_conf = { .filename = "stasis.conf", - .types = ACO_TYPES(&declined_option, &threadpool_option), + .types = ACO_TYPES(&declined_option, &threadpool_option, &taskpool_option), }; /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */ @@ -2275,7 +2315,7 @@ static void stasis_config_destructor(void *obj) struct stasis_config *cfg = obj; ao2_cleanup(cfg->declined_message_types); - ast_free(cfg->threadpool_options); + ast_free(cfg->taskpool_options); } static void *stasis_config_alloc(void) @@ -2286,8 +2326,8 @@ static void *stasis_config_alloc(void) return NULL; } - cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options)); - if (!cfg->threadpool_options) { + cfg->taskpool_options = ast_calloc(1, sizeof(*cfg->taskpool_options)); + if (!cfg->taskpool_options) { ao2_ref(cfg, -1); return NULL; } @@ -2678,7 +2718,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped); ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed); ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No"); - ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No"); + ast_cli(a->fd, "Using stasis taskpool for handling messages: %s\n", statistics->uses_taskpool ? "Yes" : "No"); ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked); ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked); @@ -3078,8 +3118,8 @@ static void stasis_cleanup(void) ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis)); ao2_cleanup(topic_all); topic_all = NULL; - ast_threadpool_shutdown(threadpool); - threadpool = NULL; + ast_taskpool_shutdown(taskpool); + taskpool = NULL; STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type); aco_info_destroy(&cfg_info); @@ -3090,7 +3130,7 @@ int stasis_init(void) { struct stasis_config *cfg; int cache_init; - struct ast_threadpool_options threadpool_opts = { 0, }; + struct ast_taskpool_options taskpool_opts = { 0, }; #ifdef AST_DEVMODE struct ao2_container *subscription_stats; struct ao2_container *topic_stats; @@ -3105,17 +3145,21 @@ int stasis_init(void) aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0); + aco_option_register(&cfg_info, "minimum_size", ACO_EXACT, + taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, minimum_size), 0, + INT_MAX); aco_option_register(&cfg_info, "initial_size", ACO_EXACT, - threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, - FLDSET(struct stasis_threadpool_conf, initial_size), 0, + taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, initial_size), 0, INT_MAX); aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT, - threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE, - FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0, + taskpool_options, "20", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0, INT_MAX); aco_option_register(&cfg_info, "max_size", ACO_EXACT, - threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE, - FLDSET(struct stasis_threadpool_conf, max_size), 0, + taskpool_options, "50", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_taskpool_conf, max_size), 0, INT_MAX); if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) { @@ -3125,7 +3169,7 @@ int stasis_init(void) return -1; } - if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) { + if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) { ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n"); ao2_ref(default_cfg, -1); @@ -3151,15 +3195,16 @@ int stasis_init(void) } } - threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION; - threadpool_opts.initial_size = cfg->threadpool_options->initial_size; - threadpool_opts.auto_increment = 1; - threadpool_opts.max_size = cfg->threadpool_options->max_size; - threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec; - threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts); + taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION; + taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size; + taskpool_opts.initial_size = cfg->taskpool_options->initial_size; + taskpool_opts.auto_increment = 1; + taskpool_opts.max_size = cfg->taskpool_options->max_size; + taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec; + taskpool = ast_taskpool_create("stasis", &taskpool_opts); ao2_ref(cfg, -1); - if (!threadpool) { - ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n"); + if (!taskpool) { + ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n"); return -1; } diff --git a/main/taskpool.c b/main/taskpool.c new file mode 100644 index 0000000000..59ac4b0c72 --- /dev/null +++ b/main/taskpool.c @@ -0,0 +1,945 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2025, Sangoma Technologies Corporation + * + * Joshua Colp + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#include "asterisk.h" + +#include "asterisk/_private.h" +#include "asterisk/taskpool.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/astobj2.h" +#include "asterisk/serializer_shutdown_group.h" +#include "asterisk/utils.h" +#include "asterisk/time.h" +#include "asterisk/sched.h" + +/*! + * \brief A taskpool taskprocessor + */ +struct taskpool_taskprocessor { + /*! The underlying taskprocessor */ + struct ast_taskprocessor *taskprocessor; + /*! The last time a task was pushed to this taskprocessor */ + struct timeval last_pushed; +}; + +/*! + * \brief A container of taskprocessors + */ +struct taskpool_taskprocessors { + /*! A vector of taskprocessors */ + AST_VECTOR(, struct taskpool_taskprocessor *) taskprocessors; + /*! The next taskprocessor to use for pushing */ + unsigned int taskprocessor_num; +}; + +typedef void (*taskpool_selector)(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, + struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached); + +/*! + * \brief An opaque taskpool structure + * + * A taskpool is a collection of taskprocessors that + * execute tasks, each from their own queue. A selector + * determines which taskprocessor to queue to at push + * time. + */ +struct ast_taskpool { + /*! The static taskprocessors, those which will always exist */ + struct taskpool_taskprocessors static_taskprocessors; + /*! The dynamic taskprocessors, those which will be created as needed */ + struct taskpool_taskprocessors dynamic_taskprocessors; + /*! True if the taskpool is in the process of shutting down */ + int shutting_down; + /*! Taskpool-specific options */ + struct ast_taskpool_options options; + /*! Dynamic pool shrinking scheduled item */ + int shrink_sched_id; + /*! The taskprocessor selector to use */ + taskpool_selector selector; + /*! The name of the taskpool */ + char name[0]; +}; + +/*! \brief The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold) */ +#define TASKPOOL_GROW_THRESHOLD (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10 + +/*! \brief Scheduler used for dynamic pool shrinking */ +static struct ast_sched_context *sched; + +/*! \brief Thread storage for the current taskpool */ +AST_THREADSTORAGE_RAW(current_taskpool_pool); + +/*! + * \internal + * \brief Get the current taskpool associated with this thread. + */ +static struct ast_taskpool *ast_taskpool_get_current(void) +{ + return ast_threadstorage_get_ptr(¤t_taskpool_pool); +} + +/*! + * \internal + * \brief Shutdown task for taskpool taskprocessor + */ + static int taskpool_taskprocessor_stop(void *data) + { + struct ast_taskpool *pool = ast_taskpool_get_current(); + + /* If a thread stop callback is set on the options, call it */ + if (pool->options.thread_end) { + pool->options.thread_end(); + } + + ao2_cleanup(pool); + + return 0; + } + +/*! \internal */ +static void taskpool_taskprocessor_dtor(void *obj) +{ + struct taskpool_taskprocessor *taskprocessor = obj; + + if (taskprocessor->taskprocessor && ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_stop, NULL)) { + /* We can't actually do anything if this fails, so just accept reality */ + } + + ast_taskprocessor_unreference(taskprocessor->taskprocessor); +} + +/*! + * \internal + * \brief Startup task for taskpool taskprocessor + */ +static int taskpool_taskprocessor_start(void *data) +{ + struct ast_taskpool *pool = data; + + /* Set the pool on the thread for this taskprocessor, inheriting the + * reference passed to the task itself. + */ + ast_threadstorage_set_ptr(¤t_taskpool_pool, pool); + + /* If a thread start callback is set on the options, call it */ + if (pool->options.thread_start) { + pool->options.thread_start(); + } + + return 0; +} + +/*! + * \internal + * \brief Allocate a taskpool specific taskprocessor + */ +static struct taskpool_taskprocessor *taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type) +{ + struct taskpool_taskprocessor *taskprocessor; + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + + /* We don't actually need locking for each pool taskprocessor, as the only thing + * mutable is the underlying taskprocessor which has its own internal locking. + */ + taskprocessor = ao2_alloc_options(sizeof(*taskprocessor), taskpool_taskprocessor_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!taskprocessor) { + return NULL; + } + + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "taskpool/%c:%s", type, pool->name); + + taskprocessor->taskprocessor = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT); + if (!taskprocessor->taskprocessor) { + ao2_ref(taskprocessor, -1); + return NULL; + } + + taskprocessor->last_pushed = ast_tvnow(); + + if (ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_start, ao2_bump(pool))) { + ao2_ref(pool, -1); + /* Prevent the taskprocessor from queueing the stop task by explicitly unreferencing and setting it to + * NULL here. + */ + ast_taskprocessor_unreference(taskprocessor->taskprocessor); + taskprocessor->taskprocessor = NULL; + return NULL; + } + + return taskprocessor; +} + +/*! + * \internal + * \brief Initialize the taskpool taskprocessors structure + */ +static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size) +{ + if (AST_VECTOR_INIT(&taskprocessors->taskprocessors, size)) { + return -1; + } + + return 0; +} + +/*! + * \internal + * \brief Clean up the taskpool taskprocessors structure + */ +static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors) +{ + /* Access/manipulation of taskprocessors is done with the lock held, and + * with a check of the shutdown flag done. This means that outside of holding + * the lock we can safely muck with it. Pushing to the taskprocessor is done + * outside of the lock, but with a reference to the taskprocessor held. + */ + AST_VECTOR_CALLBACK_VOID(&taskprocessors->taskprocessors, ao2_cleanup); + AST_VECTOR_FREE(&taskprocessors->taskprocessors); +} + +/*! + * \internal + * \brief Determine if a taskpool taskprocessor is idle + */ +#define TASKPROCESSOR_IS_IDLE(tps, timeout) (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout)) + +/*! \internal + * \brief Taskpool dynamic pool shrink function + */ +static int taskpool_dynamic_pool_shrink(const void *data) +{ + struct ast_taskpool *pool = (struct ast_taskpool *)data; + int num_removed; + + ao2_lock(pool); + + /* If the pool is shutting down, do nothing and don't reschedule */ + if (pool->shutting_down) { + ao2_unlock(pool); + ao2_ref(pool, -1); + return 0; + } + + /* Go through the dynamic taskprocessors and find any which have been idle long enough and remove them */ + num_removed = AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(&pool->dynamic_taskprocessors.taskprocessors, pool->options.idle_timeout * 1000, + TASKPROCESSOR_IS_IDLE, ao2_cleanup); + if (num_removed) { + /* If we've removed any taskprocessors the taskprocessor_num may no longer be valid, so update it */ + if (pool->dynamic_taskprocessors.taskprocessor_num >= AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors)) { + pool->dynamic_taskprocessors.taskprocessor_num = 0; + } + } + + ao2_unlock(pool); + + /* It is possible for the pool to have been shut down between unlocking and returning, this is + * inherently a race condition we can't eliminate so we will catch it on the next iteration. + */ + return pool->options.idle_timeout * 1000; +} + +/*! + * \internal + * \brief Sequential taskprocessor selector + */ + static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, + struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached) +{ + unsigned int taskprocessor_num = taskprocessors->taskprocessor_num; + + if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) { + *growth_threshold_reached = 1; + return; + } + + taskprocessors->taskprocessor_num++; + if (taskprocessors->taskprocessor_num == AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) { + taskprocessors->taskprocessor_num = 0; + } + + *taskprocessor = AST_VECTOR_GET(&taskprocessors->taskprocessors, taskprocessor_num); + + /* Check to see if this has reached the growth threshold */ + *growth_threshold_reached = (ast_taskprocessor_size((*taskprocessor)->taskprocessor) >= pool->options.growth_threshold) ? 1 : 0; +} + +/*! + * \interal + * \brief Least full taskprocessor selector + */ +static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, + struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached) +{ + struct taskpool_taskprocessor *least_full = NULL; + unsigned int i; + + if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) { + *growth_threshold_reached = 1; + return; + } + + /* We assume that the growth threshold has not yet been reached, until proven otherwise */ + *growth_threshold_reached = 0; + + for (i = 0; i < AST_VECTOR_SIZE(&taskprocessors->taskprocessors); i++) { + struct taskpool_taskprocessor *tp = AST_VECTOR_GET(&taskprocessors->taskprocessors, i); + + /* If this taskprocessor has no outstanding tasks, it is the best choice */ + if (!ast_taskprocessor_size(tp->taskprocessor)) { + *taskprocessor = tp; + return; + } + + /* If any of the taskprocessors have reached the growth threshold then we should grow the pool */ + if (ast_taskprocessor_size(tp->taskprocessor) >= pool->options.growth_threshold) { + *growth_threshold_reached = 1; + } + + /* The taskprocessor with the fewest tasks should be used */ + if (!least_full || ast_taskprocessor_size(tp->taskprocessor) < ast_taskprocessor_size(least_full->taskprocessor)) { + least_full = tp; + } + } + + *taskprocessor = least_full; +} + +struct ast_taskpool *ast_taskpool_create(const char *name, + const struct ast_taskpool_options *options) +{ + struct ast_taskpool *pool; + + /* Enforce versioning on the passed-in options */ + if (options->version != AST_TASKPOOL_OPTIONS_VERSION) { + return NULL; + } + + pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL); + if (!pool) { + return NULL; + } + + strcpy(pool->name, name); /* Safe */ + memcpy(&pool->options, options, sizeof(pool->options)); + pool->shrink_sched_id = -1; + + /* Verify the passed-in options are valid, and adjust if needed */ + if (options->initial_size < options->minimum_size) { + pool->options.initial_size = options->minimum_size; + ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n", + name, options->initial_size, options->minimum_size, options->minimum_size); + } + + if (options->max_size && pool->options.initial_size > options->max_size) { + pool->options.max_size = pool->options.initial_size; + ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n", + name, options->max_size, pool->options.initial_size, pool->options.initial_size); + } + + if (!options->auto_increment) { + if (!pool->options.minimum_size) { + pool->options.minimum_size = 1; + ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name); + } + if (!pool->options.max_size) { + pool->options.max_size = pool->options.minimum_size; + ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size); + } + if (pool->options.minimum_size != pool->options.max_size) { + pool->options.minimum_size = pool->options.max_size; + pool->options.initial_size = pool->options.max_size; + ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n", + name, options->minimum_size, pool->options.max_size, pool->options.max_size); + } + } else if (!options->growth_threshold) { + pool->options.growth_threshold = TASKPOOL_GROW_THRESHOLD; + } + + if (options->selector == AST_TASKPOOL_SELECTOR_DEFAULT || options->selector == AST_TASKPOOL_SELECTOR_LEAST_FULL) { + pool->selector = taskpool_least_full_selector; + } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) { + pool->selector = taskpool_sequential_selector; + } else { + ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n", + name, options->selector); + pool->selector = taskpool_least_full_selector; + } + + if (taskpool_taskprocessors_init(&pool->static_taskprocessors, pool->options.minimum_size)) { + ao2_ref(pool, -1); + return NULL; + } + + /* Create the static taskprocessors based on the passed-in options */ + for (int i = 0; i < pool->options.minimum_size; i++) { + struct taskpool_taskprocessor *taskprocessor; + + taskprocessor = taskpool_taskprocessor_alloc(pool, 's'); + if (!taskprocessor) { + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + + if (AST_VECTOR_APPEND(&pool->static_taskprocessors.taskprocessors, taskprocessor)) { + ao2_ref(taskprocessor, -1); + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + } + + if (taskpool_taskprocessors_init(&pool->dynamic_taskprocessors, + pool->options.initial_size - pool->options.minimum_size)) { + ast_taskpool_shutdown(pool); + return NULL; + } + + /* Create the dynamic taskprocessor based on the passed-in options */ + for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) { + struct taskpool_taskprocessor *taskprocessor; + + taskprocessor = taskpool_taskprocessor_alloc(pool, 'd'); + if (!taskprocessor) { + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + + if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, taskprocessor)) { + ao2_ref(taskprocessor, -1); + /* The reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + } + + /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do + * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function + * and to reduce complexity. + */ + if (options->idle_timeout && options->auto_increment) { + pool->shrink_sched_id = ast_sched_add(sched, options->idle_timeout * 1000, taskpool_dynamic_pool_shrink, ao2_bump(pool)); + if (pool->shrink_sched_id < 0) { + ao2_ref(pool, -1); + /* The second reference to pool is passed to ast_taskpool_shutdown */ + ast_taskpool_shutdown(pool); + return NULL; + } + } + + return pool; +} + +size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool) +{ + size_t count; + + ao2_lock(pool); + count = AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors); + ao2_unlock(pool); + + return count; +} + +#define TASKPOOL_QUEUE_SIZE_ADD(tps, size) (size += ast_taskprocessor_size(tps->taskprocessor)) + +long ast_taskpool_queue_size(struct ast_taskpool *pool) +{ + long queue_size = 0; + + ao2_lock(pool); + AST_VECTOR_CALLBACK_VOID(&pool->static_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size); + AST_VECTOR_CALLBACK_VOID(&pool->dynamic_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size); + ao2_unlock(pool); + + return queue_size; +} + +/*! \internal + * \brief Taskpool dynamic pool grow function + */ +static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor) +{ + unsigned int num_to_add = pool->options.auto_increment; + int i; + + if (!num_to_add) { + return; + } + + /* If a maximum size is enforced, then determine if we have to limit how many taskprocessors we add */ + if (pool->options.max_size) { + unsigned int current_size = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors); + + if (current_size + num_to_add > pool->options.max_size) { + num_to_add = pool->options.max_size - current_size; + } + } + + for (i = 0; i < num_to_add; i++) { + struct taskpool_taskprocessor *new_taskprocessor; + + new_taskprocessor = taskpool_taskprocessor_alloc(pool, 'd'); + if (!new_taskprocessor) { + return; + } + + if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, new_taskprocessor)) { + ao2_ref(new_taskprocessor, -1); + return; + } + + if (i == 0) { + /* On the first iteration we return the taskprocessor we just added */ + *taskprocessor = new_taskprocessor; + /* We assume we will be going back to the first taskprocessor, since we are at the end of the vector */ + pool->dynamic_taskprocessors.taskprocessor_num = 0; + } else if (i == 1) { + /* On the second iteration we update the next taskprocessor to use to be this one */ + pool->dynamic_taskprocessors.taskprocessor_num = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) - 1; + } + } +} + +int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data) +{ + RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup); + + /* Select the taskprocessor in the pool to use for pushing this task */ + ao2_lock(pool); + if (!pool->shutting_down) { + unsigned int growth_threshold_reached = 0; + + /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better + * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor + * fails for some reason, it will still fall back to the initially found static one if + * it is present. + */ + pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached); + if (pool->options.auto_increment && growth_threshold_reached) { + /* If we need to grow then try dynamic taskprocessors */ + pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached); + if (growth_threshold_reached) { + /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */ + taskpool_dynamic_pool_grow(pool, &taskprocessor); + } + + /* If a dynamic taskprocessor was used update its last push time */ + if (taskprocessor) { + taskprocessor->last_pushed = ast_tvnow(); + } + } + ao2_bump(taskprocessor); + } + ao2_unlock(pool); + + if (!taskprocessor) { + return -1; + } + + if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) { + return -1; + } + + return 0; +} + +/*! + * \internal Structure used for synchronous task + */ +struct taskpool_sync_task { + ast_mutex_t lock; + ast_cond_t cond; + int complete; + int fail; + int (*task)(void *); + void *task_data; +}; + +/*! + * \internal Initialization function for synchronous task + */ +static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int (*task)(void *), void *data) +{ + ast_mutex_init(&sync_task->lock); + ast_cond_init(&sync_task->cond, NULL); + sync_task->complete = 0; + sync_task->fail = 0; + sync_task->task = task; + sync_task->task_data = data; + return 0; +} + +/*! + * \internal Cleanup function for synchronous task + */ +static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task) +{ + ast_mutex_destroy(&sync_task->lock); + ast_cond_destroy(&sync_task->cond); +} + +/*! + * \internal Function for executing a sychronous task + */ +static int taskpool_sync_task(void *data) +{ + struct taskpool_sync_task *sync_task = data; + int ret; + + sync_task->fail = sync_task->task(sync_task->task_data); + + /* + * Once we unlock sync_task->lock after signaling, we cannot access + * sync_task again. The thread waiting within ast_taskpool_push_wait() + * is free to continue and release its local variable (sync_task). + */ + ast_mutex_lock(&sync_task->lock); + sync_task->complete = 1; + ast_cond_signal(&sync_task->cond); + ret = sync_task->fail; + ast_mutex_unlock(&sync_task->lock); + return ret; +} + +int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data) +{ + struct taskpool_sync_task sync_task; + + /* If we are already executing within a taskpool taskprocessor then + * don't bother pushing a new task, just directly execute the task. + */ + if (ast_taskpool_get_current()) { + return task(data); + } + + if (taskpool_sync_task_init(&sync_task, task, data)) { + return -1; + } + + if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) { + taskpool_sync_task_cleanup(&sync_task); + return -1; + } + + ast_mutex_lock(&sync_task.lock); + while (!sync_task.complete) { + ast_cond_wait(&sync_task.cond, &sync_task.lock); + } + ast_mutex_unlock(&sync_task.lock); + + taskpool_sync_task_cleanup(&sync_task); + return sync_task.fail; +} + +void ast_taskpool_shutdown(struct ast_taskpool *pool) +{ + if (!pool) { + return; + } + + /* Mark this pool as shutting down so nothing new is pushed */ + ao2_lock(pool); + pool->shutting_down = 1; + ao2_unlock(pool); + + /* Stop the shrink scheduled item if present */ + AST_SCHED_DEL_UNREF(sched, pool->shrink_sched_id, ao2_ref(pool, -1)); + + /* Clean up all the taskprocessors */ + taskpool_taskprocessors_cleanup(&pool->static_taskprocessors); + taskpool_taskprocessors_cleanup(&pool->dynamic_taskprocessors); + + ao2_ref(pool, -1); +} + +struct serializer { + /*! Taskpool the serializer will use to process the jobs. */ + struct ast_taskpool *pool; + /*! Which group will wait for this serializer to shutdown. */ + struct ast_serializer_shutdown_group *shutdown_group; +}; + +static void serializer_dtor(void *obj) +{ + struct serializer *ser = obj; + + ao2_cleanup(ser->pool); + ser->pool = NULL; + ao2_cleanup(ser->shutdown_group); + ser->shutdown_group = NULL; +} + +static struct serializer *serializer_create(struct ast_taskpool *pool, + struct ast_serializer_shutdown_group *shutdown_group) +{ + struct serializer *ser; + + /* This object has a lock so it can be used to ensure exclusive access + * to the execution of tasks within the serializer. + */ + ser = ao2_alloc(sizeof(*ser), serializer_dtor); + if (!ser) { + return NULL; + } + ser->pool = ao2_bump(pool); + ser->shutdown_group = ao2_bump(shutdown_group); + return ser; +} + +AST_THREADSTORAGE_RAW(current_taskpool_serializer); + +static int execute_tasks(void *data) +{ + struct ast_taskpool *pool = ast_taskpool_get_current(); + struct ast_taskprocessor *tps = data; + struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(tps); + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + size_t remaining, requeue = 0; + + /* In a normal scenario this lock will not be in contention with + * anything else. It is only if a synchronous task is pushed to + * the serializer that it may be blocked on the synchronous + * task thread. This is done to ensure that only one thread is executing + * tasks from the serializer at a given time, and not out of order + * either. + */ + ao2_lock(ser); + + ast_threadstorage_set_ptr(¤t_taskpool_serializer, tps); + for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) { + requeue = ast_taskprocessor_execute(tps); + } + ast_threadstorage_set_ptr(¤t_taskpool_serializer, NULL); + + ao2_unlock(ser); + + /* If there are remaining tasks we requeue, this way the serializer + * does not hold exclusivity of the taskpool taskprocessor + */ + if (requeue) { + /* Ownership passes to the new task */ + if (ast_taskpool_push(pool, execute_tasks, tps)) { + ast_taskprocessor_unreference(tps); + } + } else { + ast_taskprocessor_unreference(tps); + } + + return 0; +} + +static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) +{ + if (was_empty) { + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener); + + if (ast_taskpool_push(ser->pool, execute_tasks, tps)) { + ast_taskprocessor_unreference(tps); + } + } +} + +static int serializer_start(struct ast_taskprocessor_listener *listener) +{ + /* No-op */ + return 0; +} + +static void serializer_shutdown(struct ast_taskprocessor_listener *listener) +{ + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + + if (ser->shutdown_group) { + ast_serializer_shutdown_group_dec(ser->shutdown_group); + } + ao2_cleanup(ser); +} + +static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = { + .task_pushed = serializer_task_pushed, + .start = serializer_start, + .shutdown = serializer_shutdown, +}; + +struct ast_taskprocessor *ast_taskpool_serializer_get_current(void) +{ + return ast_threadstorage_get_ptr(¤t_taskpool_serializer); +} + +struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name, + struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group) +{ + struct serializer *ser; + struct ast_taskprocessor_listener *listener; + struct ast_taskprocessor *tps; + + ser = serializer_create(pool, shutdown_group); + if (!ser) { + return NULL; + } + + listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser); + if (!listener) { + ao2_ref(ser, -1); + return NULL; + } + + tps = ast_taskprocessor_create_with_listener(name, listener); + if (!tps) { + /* ser ref transferred to listener but not cleaned without tps */ + ao2_ref(ser, -1); + } else if (shutdown_group) { + ast_serializer_shutdown_group_inc(shutdown_group); + } + + ao2_ref(listener, -1); + return tps; +} + +struct ast_taskprocessor *ast_taskpool_serializer(const char *name, struct ast_taskpool *pool) +{ + return ast_taskpool_serializer_group(name, pool, NULL); +} + +/*! + * \internal An empty task callback, used to ensure the serializer does not + * go empty. */ +static int taskpool_serializer_empty_task(void *data) +{ + return 0; +} + +int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data) +{ + struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer); + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + struct ast_taskprocessor *prior_serializer; + struct taskpool_sync_task sync_task; + + /* If not in a taskpool taskprocessor we can just queue the task like normal and + * wait. */ + if (!ast_taskpool_get_current()) { + if (taskpool_sync_task_init(&sync_task, task, data)) { + return -1; + } + + if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) { + taskpool_sync_task_cleanup(&sync_task); + return -1; + } + + ast_mutex_lock(&sync_task.lock); + while (!sync_task.complete) { + ast_cond_wait(&sync_task.cond, &sync_task.lock); + } + ast_mutex_unlock(&sync_task.lock); + + taskpool_sync_task_cleanup(&sync_task); + return sync_task.fail; + } + + /* It is possible that we are already executing within a serializer, so stash the existing + * away so we can restore it. + */ + prior_serializer = ast_taskpool_serializer_get_current(); + + ao2_lock(ser); + + /* There are two cases where we can or have to directly execute this task: + * 1. There are no other tasks in the serializer + * 2. We are already in the serializer + * In the second case if we don't execute the task now, we will deadlock waiting + * on it as it will never occur. + */ + if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) { + ast_threadstorage_set_ptr(¤t_taskpool_serializer, serializer); + sync_task.fail = task(data); + ao2_unlock(ser); + ast_threadstorage_set_ptr(¤t_taskpool_serializer, prior_serializer); + return sync_task.fail; + } + + if (taskpool_sync_task_init(&sync_task, task, data)) { + ao2_unlock(ser); + return -1; + } + + /* First we queue the serialized task */ + if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) { + taskpool_sync_task_cleanup(&sync_task); + ao2_unlock(ser); + return -1; + } + + /* Next we queue the empty task to ensure the serializer doesn't reach empty, this + * stops two tasks from being queued for the same serializer at the same time. + */ + if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) { + taskpool_sync_task_cleanup(&sync_task); + ao2_unlock(ser); + return -1; + } + + /* Now we execute the tasks on the serializer until our sync task is complete */ + ast_threadstorage_set_ptr(¤t_taskpool_serializer, serializer); + while (!sync_task.complete) { + /* The sync task is guaranteed to be executed, so doing a while loop on the complete + * flag is safe. + */ + ast_taskprocessor_execute(serializer); + } + taskpool_sync_task_cleanup(&sync_task); + ao2_unlock(ser); + + ast_threadstorage_set_ptr(¤t_taskpool_serializer, prior_serializer); + + return sync_task.fail; +} + +/*! + * \internal + * \brief Clean up resources on Asterisk shutdown + */ +static void taskpool_shutdown(void) +{ + if (sched) { + ast_sched_context_destroy(sched); + sched = NULL; + } +} + +int ast_taskpool_init(void) +{ + sched = ast_sched_context_create(); + if (!sched) { + return -1; + } + + if (ast_sched_start_thread(sched)) { + return -1; + } + + ast_register_cleanup(taskpool_shutdown); + + return 0; +} diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 3c83a6883e..c4015279c5 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -967,6 +967,11 @@ long ast_taskprocessor_size(struct ast_taskprocessor *tps) return (tps) ? tps->tps_queue_size : -1; } +struct ast_taskprocessor_listener *ast_taskprocessor_listener(struct ast_taskprocessor *tps) +{ + return tps ? tps->listener : NULL; +} + /* taskprocessor name accessor */ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) { diff --git a/main/threadpool.c b/main/threadpool.c index a573670ece..0969e627c3 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1211,103 +1211,6 @@ static int worker_set_state(struct worker_thread *worker, enum worker_state stat return 0; } -/*! Serializer group shutdown control object. */ -struct ast_serializer_shutdown_group { - /*! Shutdown thread waits on this conditional. */ - ast_cond_t cond; - /*! Count of serializers needing to shutdown. */ - int count; -}; - -static void serializer_shutdown_group_dtor(void *vdoomed) -{ - struct ast_serializer_shutdown_group *doomed = vdoomed; - - ast_cond_destroy(&doomed->cond); -} - -struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void) -{ - struct ast_serializer_shutdown_group *shutdown_group; - - shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor); - if (!shutdown_group) { - return NULL; - } - ast_cond_init(&shutdown_group->cond, NULL); - return shutdown_group; -} - -int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout) -{ - int remaining; - ast_mutex_t *lock; - - if (!shutdown_group) { - return 0; - } - - lock = ao2_object_get_lockaddr(shutdown_group); - ast_assert(lock != NULL); - - ao2_lock(shutdown_group); - if (timeout) { - struct timeval start; - struct timespec end; - - start = ast_tvnow(); - end.tv_sec = start.tv_sec + timeout; - end.tv_nsec = start.tv_usec * 1000; - while (shutdown_group->count) { - if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) { - /* Error or timed out waiting for the count to reach zero. */ - break; - } - } - } else { - while (shutdown_group->count) { - if (ast_cond_wait(&shutdown_group->cond, lock)) { - /* Error */ - break; - } - } - } - remaining = shutdown_group->count; - ao2_unlock(shutdown_group); - return remaining; -} - -/*! - * \internal - * \brief Increment the number of serializer members in the group. - * \since 13.5.0 - * - * \param shutdown_group Group shutdown controller. - */ -static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group) -{ - ao2_lock(shutdown_group); - ++shutdown_group->count; - ao2_unlock(shutdown_group); -} - -/*! - * \internal - * \brief Decrement the number of serializer members in the group. - * \since 13.5.0 - * - * \param shutdown_group Group shutdown controller. - */ -static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group) -{ - ao2_lock(shutdown_group); - --shutdown_group->count; - if (!shutdown_group->count) { - ast_cond_signal(&shutdown_group->cond); - } - ao2_unlock(shutdown_group); -} - struct serializer { /*! Threadpool the serializer will use to process the jobs. */ struct ast_threadpool *pool; @@ -1379,7 +1282,7 @@ static void serializer_shutdown(struct ast_taskprocessor_listener *listener) struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); if (ser->shutdown_group) { - serializer_shutdown_group_dec(ser->shutdown_group); + ast_serializer_shutdown_group_dec(ser->shutdown_group); } ao2_cleanup(ser); } @@ -1418,7 +1321,7 @@ struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name, /* ser ref transferred to listener but not cleaned without tps */ ao2_ref(ser, -1); } else if (shutdown_group) { - serializer_shutdown_group_inc(shutdown_group); + ast_serializer_shutdown_group_inc(shutdown_group); } ao2_ref(listener, -1); diff --git a/tests/test_taskpool.c b/tests/test_taskpool.c new file mode 100644 index 0000000000..ddfb40e0ac --- /dev/null +++ b/tests/test_taskpool.c @@ -0,0 +1,810 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2025, Sangoma Technologies Inc + * + * Joshua Colp + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \brief taskpool unit tests + * + * \author Joshua Colp + * + */ + +/*** MODULEINFO + TEST_FRAMEWORK + core + ***/ + +#include "asterisk.h" + +#include "asterisk/astobj2.h" +#include "asterisk/lock.h" +#include "asterisk/logger.h" +#include "asterisk/module.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/test.h" +#include "asterisk/taskpool.h" +#include "asterisk/cli.h" + +struct test_data { + ast_mutex_t lock; + ast_cond_t cond; + int executed; + struct ast_taskprocessor *taskprocessor; +}; + +static struct test_data *test_alloc(void) +{ + struct test_data *td = ast_calloc(1, sizeof(*td)); + if (!td) { + return NULL; + } + ast_mutex_init(&td->lock); + ast_cond_init(&td->cond, NULL); + return td; +} + +static void test_destroy(struct test_data *td) +{ + ast_mutex_destroy(&td->lock); + ast_cond_destroy(&td->cond); + ast_free(td); +} + +static int simple_task(void *data) +{ + struct test_data *td = data; + SCOPED_MUTEX(lock, &td->lock); + td->taskprocessor = ast_taskpool_serializer_get_current(); + td->executed = 1; + ast_cond_signal(&td->cond); + return 0; +} + +AST_TEST_DEFINE(taskpool_push) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct timeval start; + struct timespec end; + + switch (cmd) { + case TEST_INIT: + info->name = "push"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool pushing test"; + info->description = + "Pushes a single task into a taskpool asynchronously and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_push(pool, simple_task, td)) { + goto end; + } + + /* It should not take more than 5 seconds for a single simple task to execute */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 5; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_synchronous) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + + switch (cmd) { + case TEST_INIT: + info->name = "push_synchronous"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool synchronous pushing test"; + info->description = + "Pushes a single task into a taskpool synchronously and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_push_wait(pool, simple_task, td)) { + goto end; + } + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_serializer) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct ast_taskprocessor *serializer = NULL; + struct timeval start; + struct timespec end; + + switch (cmd) { + case TEST_INIT: + info->name = "push_serializer"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool serializer pushing test"; + info->description = + "Pushes a single task into a taskpool serializer and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + serializer = ast_taskpool_serializer("serializer", pool); + if (!serializer) { + goto end; + } + + if (ast_taskprocessor_push(serializer, simple_task, td)) { + goto end; + } + + /* It should not take more than 5 seconds for a single simple task to execute */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 5; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (td->taskprocessor != serializer) { + ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskprocessor_unreference(serializer); + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_serializer_synchronous) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct ast_taskprocessor *serializer = NULL; + + switch (cmd) { + case TEST_INIT: + info->name = "push_serializer_synchronous"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool serializer synchronous pushing test"; + info->description = + "Pushes a single task into a taskpool serializer synchronously and ensures it is executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + serializer = ast_taskpool_serializer("serializer", pool); + if (!serializer) { + goto end; + } + + if (ast_taskpool_serializer_push_wait(serializer, simple_task, td)) { + goto end; + } + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (td->taskprocessor != serializer) { + ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskprocessor_unreference(serializer); + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +static int requeue_task(void *data) +{ + return ast_taskpool_serializer_push_wait(ast_taskpool_serializer_get_current(), simple_task, data); +} + +AST_TEST_DEFINE(taskpool_push_serializer_synchronous_requeue) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 1, + .initial_size = 1, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct ast_taskprocessor *serializer = NULL; + + switch (cmd) { + case TEST_INIT: + info->name = "push_serializer_synchronous_requeue"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool serializer synchronous requeueing test"; + info->description = + "Pushes a single task into a taskpool serializer synchronously and ensures it is requeued and executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + serializer = ast_taskpool_serializer("serializer", pool); + if (!serializer) { + goto end; + } + + if (ast_taskpool_serializer_push_wait(serializer, requeue_task, td)) { + goto end; + } + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (td->taskprocessor != serializer) { + ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskprocessor_unreference(serializer); + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_grow) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 1, + .minimum_size = 0, + .initial_size = 0, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct timeval start; + struct timespec end; + + switch (cmd) { + case TEST_INIT: + info->name = "push_grow"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool pushing test with auto-grow enabled"; + info->description = + "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool grows."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 0) { + ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + if (ast_taskpool_push(pool, simple_task, td)) { + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 1) { + ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + /* It should not take more than 5 seconds for a single simple task to execute */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 5; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +AST_TEST_DEFINE(taskpool_push_shrink) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 1, + .auto_increment = 1, + .minimum_size = 0, + .initial_size = 0, + .max_size = 1, + }; + enum ast_test_result_state res = AST_TEST_PASS; + struct timeval start; + struct timespec end; + int iterations = 0; + + switch (cmd) { + case TEST_INIT: + info->name = "push_shrink"; + info->category = "/main/taskpool/"; + info->summary = "Taskpool pushing test with auto-shrink enabled"; + info->description = + "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool shrinks."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + td = test_alloc(); + if (!td) { + return AST_TEST_FAIL; + } + + pool = ast_taskpool_create(info->name, &options); + if (!pool) { + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 0) { + ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + if (ast_taskpool_push(pool, simple_task, td)) { + res = AST_TEST_FAIL; + goto end; + } + + if (ast_taskpool_taskprocessors_count(pool) != 1) { + ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + + /* We give 10 seconds for the pool to shrink back to normal, but if it happens earlier we + * stop our check early. + */ + ast_mutex_lock(&td->lock); + do { + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 1; + end.tv_nsec = start.tv_usec * 1000; + + if (ast_cond_timedwait(&td->cond, &td->lock, &end) == ETIMEDOUT) { + iterations++; + } + } while (ast_taskpool_taskprocessors_count(pool) != 0 && iterations != 10); + + if (!td->executed) { + ast_test_status_update(test, "Expected simple task to be executed but it was not\n"); + res = AST_TEST_FAIL; + } + + if (ast_taskpool_taskprocessors_count(pool) != 0) { + ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool)); + res = AST_TEST_FAIL; + goto end; + } + +end: + ast_taskpool_shutdown(pool); + test_destroy(td); + return res; +} + +struct efficiency_task_data { + struct ast_taskpool *pool; + int num_tasks_executed; + int shutdown; +}; + +static int efficiency_task(void *data) +{ + struct efficiency_task_data *etd = data; + + if (etd->shutdown) { + ao2_ref(etd->pool, -1); + return 0; + } + + ast_atomic_fetchadd_int(&etd->num_tasks_executed, +1); + + if (ast_taskpool_push(etd->pool, efficiency_task, etd)) { + ao2_ref(etd->pool, -1); + return -1; + } + + return 0; +} + +static char *handle_cli_taskpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 5, + .initial_size = 5, + .max_size = 5, + }; + struct efficiency_task_data etd = { + .pool = NULL, + .num_tasks_executed = 0, + .shutdown = 0, + }; + struct timeval start; + struct timespec end; + int i; + + switch (cmd) { + case CLI_INIT: + e->command = "taskpool push efficiency"; + e->usage = + "Usage: taskpool push efficiency\n" + " Pushes 200 tasks to a taskpool and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + td = test_alloc(); + if (!td) { + return CLI_SUCCESS; + } + + pool = ast_taskpool_create("taskpool_push_efficiency", &options); + if (!pool) { + goto end; + } + + etd.pool = pool; + + /* Push in 200 tasks, cause why not */ + for (i = 0; i < 200; i++) { + /* Ensure that the task has a reference to the pool */ + ao2_bump(pool); + if (ast_taskpool_push(pool, efficiency_task, &etd)) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", etd.num_tasks_executed); + +end: + etd.shutdown = 1; + ast_taskpool_shutdown(pool); + test_destroy(td); + return CLI_SUCCESS; +} + +struct serializer_efficiency_task_data { + struct ast_taskprocessor *serializer[2]; + int *num_tasks_executed; + int *shutdown; +}; + +static int serializer_efficiency_task(void *data) +{ + struct serializer_efficiency_task_data *etd = data; + struct ast_taskprocessor *taskprocessor = etd->serializer[0]; + + if (*etd->shutdown) { + return 0; + } + + ast_atomic_fetchadd_int(etd->num_tasks_executed, +1); + + /* We ping pong a task between a pair of taskprocessors to ensure that + * a single taskprocessor does not receive a thread from the threadpool + * exclusively. + */ + if (taskprocessor == ast_taskpool_serializer_get_current()) { + taskprocessor = etd->serializer[1]; + } + + if (ast_taskprocessor_push(taskprocessor, + serializer_efficiency_task, etd)) { + return -1; + } + + return 0; +} + +static char *handle_cli_taskpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ast_taskpool *pool = NULL; + struct test_data *td = NULL; + struct ast_taskpool_options options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .minimum_size = 5, + .initial_size = 5, + .max_size = 5, + }; + struct serializer_efficiency_task_data etd[200]; + struct timeval start; + struct timespec end; + int i; + int num_tasks_executed = 0; + int shutdown = 0; + + switch (cmd) { + case CLI_INIT: + e->command = "taskpool push serializer efficiency"; + e->usage = + "Usage: taskpool push serializer efficiency\n" + " Pushes 200 tasks to a taskpool in serializers and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + td = test_alloc(); + if (!td) { + return CLI_SUCCESS; + } + + memset(&etd, 0, sizeof(etd)); + + pool = ast_taskpool_create("taskpool_push_serializer_efficiency", &options); + if (!pool) { + goto end; + } + + /* We create 400 (200 pairs) of serializers */ + for (i = 0; i < 200; i++) { + char serializer_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[0] = ast_taskpool_serializer(serializer_name, pool); + if (!etd[i].serializer[0]) { + goto end; + } + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[1] = ast_taskpool_serializer(serializer_name, pool); + if (!etd[i].serializer[1]) { + goto end; + } + + etd[i].num_tasks_executed = &num_tasks_executed; + etd[i].shutdown = &shutdown; + } + + /* And once created we push in 200 tasks */ + for (i = 0; i < 200; i++) { + if (ast_taskprocessor_push(etd[i].serializer[0], serializer_efficiency_task, &etd[i])) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&td->lock); + while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&td->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", num_tasks_executed); + shutdown = 1; + +end: + /* We need to unreference each serializer */ + for (i = 0; i < 200; i++) { + ast_taskprocessor_unreference(etd[i].serializer[0]); + ast_taskprocessor_unreference(etd[i].serializer[1]); + } + ast_taskpool_shutdown(pool); + test_destroy(td); + return CLI_SUCCESS; +} + +static struct ast_cli_entry cli[] = { + AST_CLI_DEFINE(handle_cli_taskpool_push_efficiency, "Push tasks to a taskpool and measure efficiency"), + AST_CLI_DEFINE(handle_cli_taskpool_push_serializer_efficiency, "Push tasks to a taskpool in serializers and measure efficiency"), +}; + +static int unload_module(void) +{ + ast_cli_unregister_multiple(cli, ARRAY_LEN(cli)); + AST_TEST_UNREGISTER(taskpool_push); + AST_TEST_UNREGISTER(taskpool_push_synchronous); + AST_TEST_UNREGISTER(taskpool_push_serializer); + AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous); + AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous_requeue); + AST_TEST_UNREGISTER(taskpool_push_grow); + AST_TEST_UNREGISTER(taskpool_push_shrink); + return 0; +} + +static int load_module(void) +{ + ast_cli_register_multiple(cli, ARRAY_LEN(cli)); + AST_TEST_REGISTER(taskpool_push); + AST_TEST_REGISTER(taskpool_push_synchronous); + AST_TEST_REGISTER(taskpool_push_serializer); + AST_TEST_REGISTER(taskpool_push_serializer_synchronous); + AST_TEST_REGISTER(taskpool_push_serializer_synchronous_requeue); + AST_TEST_REGISTER(taskpool_push_grow); + AST_TEST_REGISTER(taskpool_push_shrink); + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskpool test module"); diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 3fb4430d98..c2af88e8e6 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -231,7 +231,9 @@ static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, s } if (!tld->empty_notice) { - ast_test_status_update(test, "Test listener not notified that threadpool is empty\n"); + if (test) { + ast_test_status_update(test, "Test listener not notified that threadpool is empty\n"); + } res = AST_TEST_FAIL; } @@ -347,6 +349,113 @@ end: return res; } +struct efficiency_task_data { + struct ast_threadpool *pool; + int num_tasks_executed; + int shutdown; +}; + +static int efficiency_task(void *data) +{ + struct efficiency_task_data *etd = data; + + if (etd->shutdown) { + return 0; + } + + ast_atomic_fetchadd_int(&etd->num_tasks_executed, +1); + + if (ast_threadpool_push(etd->pool, efficiency_task, etd)) { + return -1; + } + + return 0; +} + +static char *handle_cli_threadpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct test_listener_data *tld = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 5, + .max_size = 5, + }; + struct efficiency_task_data etd = { + .pool = NULL, + .num_tasks_executed = 0, + .shutdown = 0, + }; + struct timeval start; + struct timespec end; + int i; + + switch (cmd) { + case CLI_INIT: + e->command = "threadpool push efficiency"; + e->usage = + "Usage: threadpool push efficiency\n" + " Pushes 200 tasks to a threadpool and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + tld = test_alloc(); + if (!tld) { + return CLI_SUCCESS; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create("threadpool_push_efficiency", listener, &options); + if (!pool) { + goto end; + } + + etd.pool = pool; + + /* Push in 200 tasks, cause why not */ + for (i = 0; i < 200; i++) { + if (ast_threadpool_push(pool, efficiency_task, &etd)) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&tld->lock); + while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&tld->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", etd.num_tasks_executed); + etd.shutdown = 1; + + res = wait_for_empty_notice(NULL, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return CLI_SUCCESS; +} + AST_TEST_DEFINE(threadpool_initial_threads) { struct ast_threadpool *pool = NULL; @@ -1744,6 +1853,147 @@ end: return res; } +struct serializer_efficiency_task_data { + struct ast_taskprocessor *serializer[2]; + int *num_tasks_executed; + int *shutdown; +}; + +static int serializer_efficiency_task(void *data) +{ + struct serializer_efficiency_task_data *etd = data; + struct ast_taskprocessor *taskprocessor = etd->serializer[0]; + + if (*etd->shutdown) { + return 0; + } + + ast_atomic_fetchadd_int(etd->num_tasks_executed, +1); + + /* We ping pong a task between a pair of taskprocessors to ensure that + * a single taskprocessor does not receive a thread from the threadpool + * exclusively. + */ + if (taskprocessor == ast_threadpool_serializer_get_current()) { + taskprocessor = etd->serializer[1]; + } + + if (ast_taskprocessor_push(taskprocessor, + serializer_efficiency_task, etd)) { + return -1; + } + + return 0; +} + +static char *handle_cli_threadpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd, + struct ast_cli_args *a) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct test_listener_data *tld = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 5, + .max_size = 5, + }; + struct serializer_efficiency_task_data etd[200]; + struct timeval start; + struct timespec end; + int i; + int num_tasks_executed = 0; + int shutdown = 0; + + switch (cmd) { + case CLI_INIT: + e->command = "threadpool push serializer efficiency"; + e->usage = + "Usage: threadpool push serializer efficiency\n" + " Pushes 200 tasks to a threadpool in serializers and measures\n" + " the number of tasks executed within 30 seconds.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + tld = test_alloc(); + if (!tld) { + return CLI_SUCCESS; + } + + memset(&etd, 0, sizeof(etd)); + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create("threadpool_push_serializer_efficiency", listener, &options); + if (!pool) { + goto end; + } + + /* We create 400 (200 pairs) of serializers */ + for (i = 0; i < 200; i++) { + char serializer_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[0] = ast_threadpool_serializer(serializer_name, pool); + if (!etd[i].serializer[0]) { + goto end; + } + + ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i); + etd[i].serializer[1] = ast_threadpool_serializer(serializer_name, pool); + if (!etd[i].serializer[1]) { + goto end; + } + + etd[i].num_tasks_executed = &num_tasks_executed; + etd[i].shutdown = &shutdown; + } + + /* And once created we push in 200 tasks */ + for (i = 0; i < 200; i++) { + if (ast_taskprocessor_push(etd[i].serializer[0], serializer_efficiency_task, &etd[i])) { + goto end; + } + } + + /* Wait for 30 seconds */ + start = ast_tvnow(); + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + ast_mutex_lock(&tld->lock); + while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) { + } + ast_mutex_unlock(&tld->lock); + + /* Give the total tasks executed, and tell each task to not requeue */ + ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", num_tasks_executed); + shutdown = 1; + + res = wait_for_empty_notice(NULL, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + +end: + /* We need to unreference each serializer */ + for (i = 0; i < 200; i++) { + ast_taskprocessor_unreference(etd[i].serializer[0]); + ast_taskprocessor_unreference(etd[i].serializer[1]); + } + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return CLI_SUCCESS; +} + AST_TEST_DEFINE(threadpool_serializer_dupe) { enum ast_test_result_state res = AST_TEST_FAIL; @@ -1798,6 +2048,11 @@ end: return res; } +static struct ast_cli_entry cli[] = { + AST_CLI_DEFINE(handle_cli_threadpool_push_efficiency, "Push tasks to a threadpool and measure efficiency"), + AST_CLI_DEFINE(handle_cli_threadpool_push_serializer_efficiency, "Push tasks to a threadpool in serializers and measure efficiency"), +}; + static int unload_module(void) { ast_test_unregister(threadpool_push); @@ -1816,11 +2071,13 @@ static int unload_module(void) ast_test_unregister(threadpool_more_destruction); ast_test_unregister(threadpool_serializer); ast_test_unregister(threadpool_serializer_dupe); + ast_cli_unregister_multiple(cli, ARRAY_LEN(cli)); return 0; } static int load_module(void) { + ast_cli_register_multiple(cli, ARRAY_LEN(cli)); ast_test_register(threadpool_push); ast_test_register(threadpool_initial_threads); ast_test_register(threadpool_thread_creation);