From 8d39faaf124d12da4336398e2f0393f9fc677233 Mon Sep 17 00:00:00 2001 From: "Joshua C. Colp" Date: Tue, 23 Sep 2025 18:54:22 -0300 Subject: [PATCH] pjsip: Move from threadpool to taskpool This change moves the PJSIP module from the threadpool API to the taskpool API. PJSIP-specific implementations for task usage have been removed and replaced with calls to the optimized taskpool implementations instead. The need for a pool of serializers has also been removed as taskpool inherently provides this. The default settings have also been changed to be more realistic for common usage. UpgradeNote: The threadpool_* options in pjsip.conf have now been deprecated though they continue to be read and used. They have been replaced with taskpool options that give greater control over the underlying taskpool used for PJSIP. An alembic upgrade script has been added to add these options to realtime as well. --- channels/chan_pjsip.c | 6 +- configs/samples/pjsip.conf.sample | 18 +-- ...357dc178_add_taskpool_options_to_system.py | 29 +++++ include/asterisk/res_pjsip.h | 20 +-- include/asterisk/res_pjsip_session.h | 2 +- include/asterisk/taskpool.h | 20 +++ main/taskpool.c | 77 +++++++++++ res/res_pjsip.c | 121 ++++-------------- res/res_pjsip/config_system.c | 53 +++++--- res/res_pjsip/include/res_pjsip_private.h | 8 +- res/res_pjsip/pjsip_config.xml | 63 +++++++-- res/res_pjsip/pjsip_distributor.c | 4 +- res/res_pjsip/pjsip_options.c | 2 +- res/res_pjsip/pjsip_resolver.c | 4 +- res/res_pjsip_mwi.c | 4 +- res/res_pjsip_outbound_publish.c | 3 +- res/res_pjsip_outbound_registration.c | 7 +- res/res_pjsip_refer.c | 4 +- res/res_pjsip_rfc3326.c | 6 +- res/res_pjsip_rfc3329.c | 1 - res/res_pjsip_session.c | 106 +-------------- 21 files changed, 284 insertions(+), 274 deletions(-) create mode 100644 contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c index 5b03f88951..ae3f58a9b0 100644 --- a/channels/chan_pjsip.c +++ b/channels/chan_pjsip.c @@ -720,7 +720,7 @@ static int answer(void *data) ast_channel_name(session->channel), err); /* * Return this value so we can distinguish between this - * failure and the threadpool synchronous push failing. + * failure and the taskpool synchronous push failing. */ SCOPE_EXIT_RTN_VALUE(-2, "pjproject failure\n"); } @@ -753,7 +753,7 @@ static int chan_pjsip_answer(struct ast_channel *ast) res = ast_sip_push_task_wait_serializer(session->serializer, answer, &ans_data); if (res) { if (res == -1) { - ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n", + ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the taskpool.\n", ast_channel_name(session->channel)); } ao2_ref(session, -1); @@ -2599,7 +2599,7 @@ static int chan_pjsip_hangup(struct ast_channel *ast) } if (ast_sip_push_task(channel->session->serializer, hangup, h_data)) { - ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n"); + ast_log(LOG_WARNING, "Unable to push hangup task to the taskpool. Expect bad things\n"); goto failure; } diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample index c399ba8a24..f3619909b6 100644 --- a/configs/samples/pjsip.conf.sample +++ b/configs/samples/pjsip.conf.sample @@ -1287,14 +1287,16 @@ ;timer_b=32000 ; Set transaction timer B value milliseconds (default: "32000") ;compact_headers=no ; Use the short forms of common SIP header names ; (default: "no") -;threadpool_initial_size=0 ; Initial number of threads in the res_pjsip - ; threadpool (default: "0") -;threadpool_auto_increment=5 ; The amount by which the number of threads is - ; incremented when necessary (default: "5") -;threadpool_idle_timeout=60 ; Number of seconds before an idle thread - ; should be disposed of (default: "60") -;threadpool_max_size=0 ; Maximum number of threads in the res_pjsip threadpool - ; A value of 0 indicates no maximum (default: "0") +;taskpool_minimum_size=4 ; Minimum number of taskprocessors in the res_pjsip + ; taskpool (default: "4") +;taskpool_initial_size=4 ; Initial number of taskprocessors in the res_pjsip + ; taskpool (default: "4") +;taskpool_auto_increment=1 ; The amount by which the number of taskprocessors is + ; incremented when necessary (default: "1") +;taskpool_idle_timeout=60 ; Number of seconds before an idle taskprocessor + ; should be disposed of (default: "60") +;taskpool_max_size=50 ; Maximum number of taskprocessors in the res_pjsip taskpool + ; A value of 0 indicates no maximum (default: "50") ;disable_tcp_switch=yes ; Disable automatic switching from UDP to TCP transports ; if outgoing request is too large. ; See RFC 3261 section 18.1.1. diff --git a/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py b/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py new file mode 100644 index 0000000000..b07c966bd2 --- /dev/null +++ b/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py @@ -0,0 +1,29 @@ +"""add taskpool options to system + +Revision ID: dc7c357dc178 +Revises: abdc9ede147d +Create Date: 2025-09-24 09:45:17.609185 + +""" + +# revision identifiers, used by Alembic. +revision = 'dc7c357dc178' +down_revision = 'abdc9ede147d' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('ps_systems', sa.Column('taskpool_minimum_size', sa.Integer)) + op.add_column('ps_systems', sa.Column('taskpool_initial_size', sa.Integer)) + op.add_column('ps_systems', sa.Column('taskpool_auto_increment', sa.Integer)) + op.add_column('ps_systems', sa.Column('taskpool_idle_timeout', sa.Integer)) + op.add_column('ps_systems', sa.Column('taskpool_max_size', sa.Integer)) + +def downgrade(): + op.drop_column('ps_systems', 'taskpool_minimum_size') + op.drop_column('ps_systems', 'taskpool_initial_size') + op.drop_column('ps_systems', 'taskpool_auto_increment') + op.drop_column('ps_systems', 'taskpool_idle_timeout') + op.drop_column('ps_systems', 'taskpool_max_size') diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 2fafd5790a..c49e9dfac7 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1921,7 +1921,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); * There are three major types of threads that SIP will have to deal with: * \li Asterisk threads * \li PJSIP threads - * \li SIP threadpool threads (a.k.a. "servants") + * \li SIP taskpool threads (a.k.a. "servants") * * \par Asterisk Threads * @@ -1963,7 +1963,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); * is NULL, then the work will be handed off to whatever servant can currently handle * the task. If this pointer is non-NULL, then the task will not be executed until * previous tasks pushed with the same serializer have completed. For more information - * on serializers and the benefits they provide, see \ref ast_threadpool_serializer + * on serializers and the benefits they provide, see \ref ast_taskpool_serializer * * \par Scheduler * @@ -1992,7 +1992,7 @@ typedef int (*ast_sip_task)(void *user_data); * \brief Create a new serializer for SIP tasks * \since 13.8.0 * - * See \ref ast_threadpool_serializer for more information on serializers. + * See \ref ast_taskpool_serializer for more information on serializers. * SIP creates serializers so that tasks operating on similar data will run * in sequence. * @@ -2009,7 +2009,7 @@ struct ast_serializer_shutdown_group; * \brief Create a new serializer for SIP tasks * \since 13.8.0 * - * See \ref ast_threadpool_serializer for more information on serializers. + * See \ref ast_taskpool_serializer for more information on serializers. * SIP creates serializers so that tasks operating on similar data will run * in sequence. * @@ -2251,7 +2251,7 @@ enum ast_sip_scheduler_task_flags { struct ast_sip_sched_task; /*! - * \brief Schedule a task to run in the res_pjsip thread pool + * \brief Schedule a task to run in the res_pjsip taskpool * \since 13.9.0 * * \param serializer The serializer to use. If NULL, don't use a serializer (see note below) @@ -2266,7 +2266,7 @@ struct ast_sip_sched_task; * \par Serialization * * Specifying a serializer guarantees serialized execution but NOT specifying a serializer - * may still result in tasks being effectively serialized if the thread pool is busy. + * may still result in tasks being effectively serialized if the taskpool is busy. * The point of the serializer BTW is not to prevent parallel executions of the SAME task. * That happens automatically (see below). It's to prevent the task from running at the same * time as other work using the same serializer, whether or not it's being run by the scheduler. @@ -3662,15 +3662,15 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr); const char *ast_sip_get_host_ip_string(int af); /*! - * \brief Return the size of the SIP threadpool's task queue + * \brief Return the size of the SIP taskpool's task queue * \since 13.7.0 */ -long ast_sip_threadpool_queue_size(void); +long ast_sip_taskpool_queue_size(void); /*! - * \brief Retrieve the SIP threadpool object + * \brief Retrieve the SIP taskpool object */ -struct ast_threadpool *ast_sip_threadpool(void); +struct ast_taskpool *ast_sip_taskpool(void); /*! * \brief Retrieve transport state diff --git a/include/asterisk/res_pjsip_session.h b/include/asterisk/res_pjsip_session.h index 75ba60e523..7279d0d448 100644 --- a/include/asterisk/res_pjsip_session.h +++ b/include/asterisk/res_pjsip_session.h @@ -195,7 +195,7 @@ struct ast_sip_session { struct ao2_container *datastores; /*! Serializer for tasks relating to this SIP session */ struct ast_taskprocessor *serializer; - /*! Non-null if the session serializer is suspended or being suspended. */ + /*! \deprecated Non-null if the session serializer is suspended or being suspended. */ struct ast_sip_session_suspender *suspended; /*! Requests that could not be sent due to current inv_session state */ AST_LIST_HEAD_NOLOCK(, ast_sip_session_delayed_request) delayed_requests; diff --git a/include/asterisk/taskpool.h b/include/asterisk/taskpool.h index 2a4f963052..bf1c1901eb 100644 --- a/include/asterisk/taskpool.h +++ b/include/asterisk/taskpool.h @@ -318,4 +318,24 @@ struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name, */ int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data); +/*! + * \brief Suspend a serializer, causing tasks to be queued until unsuspended + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param serializer The serializer to suspend + */ +void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer); + +/*! + * \brief Unsuspend a serializer, causing tasks to be executed + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param serializer The serializer to unsuspend + */ +void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer); + #endif /* ASTERISK_TASKPOOL_H */ diff --git a/main/taskpool.c b/main/taskpool.c index 59ac4b0c72..987ad3776f 100644 --- a/main/taskpool.c +++ b/main/taskpool.c @@ -676,6 +676,8 @@ struct serializer { struct ast_taskpool *pool; /*! Which group will wait for this serializer to shutdown. */ struct ast_serializer_shutdown_group *shutdown_group; + /*! Whether the serializer is suspended or not. */ + unsigned int suspended:1; }; static void serializer_dtor(void *obj) @@ -727,6 +729,15 @@ static int execute_tasks(void *data) ast_threadstorage_set_ptr(¤t_taskpool_serializer, tps); for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) { requeue = ast_taskprocessor_execute(tps); + + /* If the serializer is suspended we will not execute any more tasks and + * we will not requeue the taskpool task. Instead it will be requeued when + * the serializer is unsuspended. + */ + if (ser->suspended) { + requeue = 0; + break; + } } ast_threadstorage_set_ptr(¤t_taskpool_serializer, NULL); @@ -916,6 +927,72 @@ int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int return sync_task.fail; } +/*! + * \internal A task that suspends the serializer after queuing an empty task + */ +static int taskpool_serializer_suspend_task(void *data) +{ + struct ast_taskprocessor *serializer = data; + struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer); + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + + /* If already suspended this is a no-op */ + if (ser->suspended) { + return 0; + } + + /* First we queue the empty task to ensure the serializer doesn't reach empty, this + * prevents any threads from queueing up a taskpool task that executes the serializer + * while it is suspended, allowing us to queue it ourselves when the serializer is + * unsuspended. + */ + if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) { + return 0; + } + + /* Next we suspend the serializer so that the execute_tasks currently executing stops + * and doesn't requeue. + */ + ser->suspended = 1; + + return 0; + } + +void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer) +{ + if (ast_taskprocessor_is_task(serializer)) { + /* I am the session's serializer thread so I cannot suspend. */ + return; + } + + /* Once this returns there is no thread executing the tasks on the serializer, so they + * will accumulate until the serializer is unsuspended. + */ + ast_taskpool_serializer_push_wait(serializer, taskpool_serializer_suspend_task, serializer); +} + +void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer) +{ + struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer); + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + + ao2_lock(ser); + + if (!ser->suspended) { + ao2_unlock(ser); + return; + } + + ser->suspended = 0; + + ao2_unlock(ser); + + /* And now we kick off handling of the queued tasks once again */ + if (ast_taskpool_push(ser->pool, execute_tasks, ao2_bump(serializer))) { + ast_taskprocessor_unreference(serializer); + } +} + /*! * \internal * \brief Clean up resources on Asterisk shutdown diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 5b954b2226..e3e1c59852 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -37,7 +37,7 @@ #include "asterisk/astobj2.h" #include "asterisk/module.h" #include "asterisk/serializer.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #include "asterisk/taskprocessor.h" #include "asterisk/uuid.h" #include "asterisk/sorcery.h" @@ -65,15 +65,9 @@ #define MOD_DATA_CONTACT "contact" -/*! Number of serializers in pool if one not supplied. */ -#define SERIALIZER_POOL_SIZE 8 - -/*! Pool of serializers to use if not supplied. */ -static struct ast_serializer_pool *sip_serializer_pool; - static pjsip_endpoint *ast_pjsip_endpoint; -static struct ast_threadpool *sip_threadpool; +static struct ast_taskpool *sip_taskpool; /*! Local host address for IPv4 */ static pj_sockaddr host_ip_ipv4; @@ -2088,7 +2082,7 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text) struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group) { - return ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group); + return ast_taskpool_serializer_group(name, sip_taskpool, shutdown_group); } struct ast_taskprocessor *ast_sip_create_serializer(const char *name) @@ -2099,67 +2093,18 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name) int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - serializer = ast_serializer_pool_get(sip_serializer_pool); + return ast_taskpool_push(sip_taskpool, sip_task, task_data); } return ast_taskprocessor_push(serializer, sip_task, task_data); } -struct sync_task_data { - ast_mutex_t lock; - ast_cond_t cond; - int complete; - int fail; - int (*task)(void *); - void *task_data; -}; - -static int sync_task(void *data) -{ - struct sync_task_data *std = data; - int ret; - - std->fail = std->task(std->task_data); - - /* - * Once we unlock std->lock after signaling, we cannot access - * std again. The thread waiting within ast_sip_push_task_wait() - * is free to continue and release its local variable (std). - */ - ast_mutex_lock(&std->lock); - std->complete = 1; - ast_cond_signal(&std->cond); - ret = std->fail; - ast_mutex_unlock(&std->lock); - return ret; -} - static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { - /* This method is an onion */ - struct sync_task_data std; - - memset(&std, 0, sizeof(std)); - ast_mutex_init(&std.lock); - ast_cond_init(&std.cond, NULL); - std.task = sip_task; - std.task_data = task_data; - - if (ast_sip_push_task(serializer, sync_task, &std)) { - ast_mutex_destroy(&std.lock); - ast_cond_destroy(&std.cond); - return -1; + if (!serializer) { + return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data); } - - ast_mutex_lock(&std.lock); - while (!std.complete) { - ast_cond_wait(&std.cond, &std.lock); - } - ast_mutex_unlock(&std.lock); - - ast_mutex_destroy(&std.lock); - ast_cond_destroy(&std.cond); - return std.fail; + return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data); } int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) @@ -2179,23 +2124,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - /* Caller doesn't care which PJSIP serializer the task executes under. */ - serializer = ast_serializer_pool_get(sip_serializer_pool); - if (!serializer) { - /* No serializer picked to execute the task */ - return -1; - } - } - if (ast_taskprocessor_is_task(serializer)) { - /* - * We are the requested serializer so we must execute - * the task now or deadlock waiting on ourself to - * execute it. - */ - return sip_task(task_data); + return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data); } - return ast_sip_push_task_wait(serializer, sip_task, task_data); + return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data); } void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size) @@ -3454,14 +3386,14 @@ static void remove_request_headers(pjsip_endpoint *endpt) } } -long ast_sip_threadpool_queue_size(void) +long ast_sip_taskpool_queue_size(void) { - return ast_threadpool_queue_size(sip_threadpool); + return ast_taskpool_queue_size(sip_taskpool); } -struct ast_threadpool *ast_sip_threadpool(void) +struct ast_taskpool *ast_sip_taskpool(void) { - return sip_threadpool; + return sip_taskpool; } int ast_sip_is_uri_sip_sips(pjsip_uri *uri) @@ -3801,7 +3733,7 @@ static int unload_pjsip(void *data) * These calls need the pjsip endpoint and serializer to clean up. * If they're not set, then there's nothing to clean up anyway. */ - if (ast_pjsip_endpoint && sip_serializer_pool) { + if (ast_pjsip_endpoint) { ast_res_pjsip_cleanup_options_handling(); ast_res_pjsip_cleanup_message_filter(); ast_sip_destroy_distributor(); @@ -3921,7 +3853,7 @@ pjsip_media_type pjsip_media_type_text_plain; static int load_module(void) { - struct ast_threadpool_options options; + struct ast_taskpool_options options; /* pjproject and config_system need to be initialized before all else */ if (pj_init() != PJ_SUCCESS) { @@ -3958,18 +3890,11 @@ static int load_module(void) goto error; } - /* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */ - sip_get_threadpool_options(&options); + /* The serializer needs taskpool and taskpool needs pjproject to be initialized so it's next */ + sip_get_taskpool_options(&options); options.thread_start = sip_thread_start; - sip_threadpool = ast_threadpool_create("pjsip", NULL, &options); - if (!sip_threadpool) { - goto error; - } - - sip_serializer_pool = ast_serializer_pool_create( - "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1); - if (!sip_serializer_pool) { - ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); + sip_taskpool = ast_taskpool_create("pjsip", &options); + if (!sip_taskpool) { goto error; } @@ -4049,8 +3974,7 @@ error: /* These functions all check for NULLs and are safe to call at any time */ ast_sip_destroy_scheduler(); - ast_serializer_pool_destroy(sip_serializer_pool); - ast_threadpool_shutdown(sip_threadpool); + ast_taskpool_shutdown(sip_taskpool); return AST_MODULE_LOAD_DECLINE; } @@ -4076,12 +4000,11 @@ static int unload_module(void) ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); /* The thread this is called from cannot call PJSIP/PJLIB functions, - * so we have to push the work to the threadpool to handle + * so we have to push the work to the taskpool to handle */ ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); ast_sip_destroy_scheduler(); - ast_serializer_pool_destroy(sip_serializer_pool); - ast_threadpool_shutdown(sip_threadpool); + ast_taskpool_shutdown(sip_taskpool); return 0; } diff --git a/res/res_pjsip/config_system.c b/res/res_pjsip/config_system.c index e16738f610..c782c6f28e 100644 --- a/res/res_pjsip/config_system.c +++ b/res/res_pjsip/config_system.c @@ -24,7 +24,7 @@ #include "asterisk/res_pjsip.h" #include "asterisk/sorcery.h" #include "include/res_pjsip_private.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #include "asterisk/dns.h" #include "asterisk/res_pjsip_cli.h" @@ -41,15 +41,17 @@ struct system_config { /*! Should we use short forms for headers? */ unsigned int compactheaders; struct { - /*! Initial number of threads in the threadpool */ + /*! Minimum number of taskprocessors in the taskpool */ + int minimum_size; + /*! Initial number of taskprocessors in the taskpool */ int initial_size; - /*! The amount by which the number of threads is incremented when necessary */ + /*! The amount by which the number of taskprocessors is incremented when necessary */ int auto_increment; - /*! Thread idle timeout in seconds */ + /*! Taskprocessor idle timeout in seconds */ int idle_timeout; - /*! Maxumum number of threads in the threadpool */ + /*! Maxumum number of taskprocessors in the taskpool */ int max_size; - } threadpool; + } taskpool; /*! Nonzero to disable switching from UDP to TCP transport */ unsigned int disable_tcp_switch; /*! @@ -63,13 +65,13 @@ struct system_config { unsigned int disable_rport; }; -static struct ast_threadpool_options sip_threadpool_options = { - .version = AST_THREADPOOL_OPTIONS_VERSION, +static struct ast_taskpool_options sip_taskpool_options = { + .version = AST_TASKPOOL_OPTIONS_VERSION, }; -void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options) +void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options) { - *threadpool_options = sip_threadpool_options; + *taskpool_options = sip_taskpool_options; } static struct ast_sorcery *system_sorcery; @@ -125,10 +127,11 @@ static int system_apply(const struct ast_sorcery *sorcery, void *obj) #endif } - sip_threadpool_options.initial_size = system->threadpool.initial_size; - sip_threadpool_options.auto_increment = system->threadpool.auto_increment; - sip_threadpool_options.idle_timeout = system->threadpool.idle_timeout; - sip_threadpool_options.max_size = system->threadpool.max_size; + sip_taskpool_options.minimum_size = system->taskpool.minimum_size; + sip_taskpool_options.initial_size = system->taskpool.initial_size; + sip_taskpool_options.auto_increment = system->taskpool.auto_increment; + sip_taskpool_options.idle_timeout = system->taskpool.idle_timeout; + sip_taskpool_options.max_size = system->taskpool.max_size; pjsip_cfg()->endpt.disable_tcp_switch = system->disable_tcp_switch ? PJ_TRUE : PJ_FALSE; @@ -199,14 +202,24 @@ int ast_sip_initialize_system(void) OPT_UINT_T, 0, FLDSET(struct system_config, timerb)); ast_sorcery_object_field_register(system_sorcery, "system", "compact_headers", "no", OPT_BOOL_T, 1, FLDSET(struct system_config, compactheaders)); - ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "0", - OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.initial_size)); - ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "5", - OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.auto_increment)); + ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_minimum_size", "4", + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.minimum_size)); + ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_initial_size", "4", + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size)); + ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "4", + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size)); + ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_auto_increment", "1", + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment)); + ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "1", + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment)); + ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_idle_timeout", "60", + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout)); ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_idle_timeout", "60", - OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.idle_timeout)); + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout)); + ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_max_size", "50", + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size)); ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_max_size", "50", - OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.max_size)); + OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size)); ast_sorcery_object_field_register(system_sorcery, "system", "disable_tcp_switch", "yes", OPT_BOOL_T, 1, FLDSET(struct system_config, disable_tcp_switch)); ast_sorcery_object_field_register(system_sorcery, "system", "follow_early_media_fork", "yes", diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index dcb821cedc..11582ad1c4 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -18,7 +18,7 @@ #include "asterisk/compat.h" struct ao2_container; -struct ast_threadpool_options; +struct ast_taskpool_options; struct ast_sip_cli_context; /*! @@ -116,7 +116,7 @@ int ast_sip_destroy_sorcery_auth(void); * \brief Initialize the distributor module * * The distributor module is responsible for taking an incoming - * SIP message and placing it into the threadpool. Once in the threadpool, + * SIP message and placing it into the taskpool. Once in the taskpool, * the distributor will perform endpoint lookups and authentication, and * then distribute the message up the stack to any further modules. * @@ -278,9 +278,9 @@ void ast_res_pjsip_cleanup_message_filter(void); /*! * \internal - * \brief Get threadpool options + * \brief Get taskpool options */ -void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options); +void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options); /*! * \internal diff --git a/res/res_pjsip/pjsip_config.xml b/res/res_pjsip/pjsip_config.xml index cef061b84c..8019426da7 100644 --- a/res/res_pjsip/pjsip_config.xml +++ b/res/res_pjsip/pjsip_config.xml @@ -3009,30 +3009,75 @@ Use the short forms of common SIP header names. - + - 12.0.0 + 23.1.0 + 22.7.0 + 20.17.0 - Initial number of threads in the res_pjsip threadpool. + Minimum number of taskprocessors in the res_pjsip taskpool. - + + + 23.1.0 + 22.7.0 + 20.17.0 + + Initial number of taskprocessors in the res_pjsip taskpool. + + + + 23.1.0 + 22.7.0 + 20.17.0 + + The amount by which the number of taskprocessors is incremented when necessary. + + + + 23.1.0 + 22.7.0 + 20.17.0 + + Number of seconds before an idle taskprocessor should be disposed of. + + + + 23.1.0 + 22.7.0 + 20.17.0 + + Maximum number of taskprocessors in the res_pjsip taskpool. + A value of 0 indicates no maximum. + + 12.0.0 - The amount by which the number of threads is incremented when necessary. + Initial number of threads in the res_pjsip taskpool. + Deprecated in favor of taskpool_initiali_size. + + + + 12.0.0 + + The amount by which the number of threads is incremented when necessary. + Deprecated in favor of taskpool_auto_increment. 12.0.0 - Number of seconds before an idle thread should be disposed of. + Number of seconds before an idle taskprocessor should be disposed of. + Deprecated in favor of taskpool_idle_timeout. - + 13.7.0 - Maximum number of threads in the res_pjsip threadpool. - A value of 0 indicates no maximum. + Maximum number of taskprocessors in the res_pjsip taskpool. + A value of 0 indicates no maximum. + Deprecated in favor of taskpool_max_size. diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index c960611101..adf18d3b8b 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -24,7 +24,7 @@ #include "asterisk/acl.h" #include "include/res_pjsip_private.h" #include "asterisk/taskprocessor.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #include "asterisk/res_pjsip_cli.h" static int distribute(void *data); @@ -75,7 +75,7 @@ static pj_status_t record_serializer(pjsip_tx_data *tdata) { struct ast_taskprocessor *serializer; - serializer = ast_threadpool_serializer_get_current(); + serializer = ast_taskpool_serializer_get_current(); if (serializer) { const char *name; diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c index 93354b9309..1d75c903ed 100644 --- a/res/res_pjsip/pjsip_options.c +++ b/res/res_pjsip/pjsip_options.c @@ -33,7 +33,7 @@ #include "asterisk/statsd.h" #include "include/res_pjsip_private.h" #include "asterisk/taskprocessor.h" -#include "asterisk/threadpool.h" +#include "asterisk/serializer_shutdown_group.h" /* * This implementation for OPTIONS support is based around the idea diff --git a/res/res_pjsip/pjsip_resolver.c b/res/res_pjsip/pjsip_resolver.c index 2babfe40de..f5f4a1cfc5 100644 --- a/res/res_pjsip/pjsip_resolver.c +++ b/res/res_pjsip/pjsip_resolver.c @@ -31,7 +31,7 @@ #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" #include "asterisk/taskprocessor.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" #ifdef HAVE_PJSIP_EXTERNAL_RESOLVER @@ -611,7 +611,7 @@ static void sip_resolve(pjsip_resolver_t *resolver, pj_pool_t *pool, const pjsip return; } - resolve->serializer = ao2_bump(ast_threadpool_serializer_get_current()); + resolve->serializer = ao2_bump(ast_taskpool_serializer_get_current()); ast_debug(2, "[%p] Starting initial resolution using parallel queries for target '%s'\n", resolve, host); ast_dns_query_set_resolve_async(resolve->queries, sip_resolve_callback, resolve); diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index 072ddce5cc..22c7a1f345 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -1573,8 +1573,8 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } - mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi", - MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME); + mwi_serializer_pool = ast_serializer_taskpool_create("pjsip/mwi", + MWI_SERIALIZER_POOL_SIZE, ast_sip_taskpool(), MAX_UNLOAD_TIMEOUT_TIME); if (!mwi_serializer_pool) { ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); } diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 4b47f2b33f..538df56125 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -33,7 +33,8 @@ #include "asterisk/res_pjsip_outbound_publish.h" #include "asterisk/module.h" #include "asterisk/taskprocessor.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" +#include "asterisk/serializer_shutdown_group.h" #include "asterisk/datastore.h" #include "res_pjsip/include/res_pjsip_private.h" diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 75744affa9..2746eeaea9 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -35,7 +35,8 @@ #include "asterisk/cli.h" #include "asterisk/stasis_system.h" #include "asterisk/threadstorage.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" +#include "asterisk/serializer_shutdown_group.h" #include "asterisk/statsd.h" #include "res_pjsip/include/res_pjsip_private.h" #include "asterisk/vector.h" @@ -1669,7 +1670,7 @@ static void sip_outbound_registration_response_cb(struct pjsip_regc_cbparam *par * pjproject callback thread. */ if (ast_sip_push_task(client_state->serializer, handle_registration_response, response)) { - ast_log(LOG_WARNING, "Failed to pass incoming registration response to threadpool\n"); + ast_log(LOG_WARNING, "Failed to pass incoming registration response to taskpool\n"); ao2_cleanup(response); } } @@ -1690,7 +1691,7 @@ static void sip_outbound_registration_state_destroy(void *obj) ao2_ref(state->client_state, -1); } else if (ast_sip_push_task(state->client_state->serializer, handle_client_state_destruction, state->client_state)) { - ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to threadpool\n"); + ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to taskpool\n"); ao2_ref(state->client_state, -1); } } diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index 6616b1c0c9..67d7836ce7 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -343,7 +343,7 @@ static struct ast_frame *refer_progress_framehook(struct ast_channel *chan, stru } } - /* If a notification is due to be sent push it to the thread pool */ + /* If a notification is due to be sent push it to the taskpool */ if (notification) { /* If the subscription is being terminated we don't need the frame hook any longer */ if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) { @@ -395,7 +395,7 @@ static struct ast_frame *refer_ari_progress_framehook(struct ast_channel *chan, progress->ari_state->last_response = *message; } - /* If a notification is due to be sent push it to the thread pool */ + /* If a notification is due to be sent push it to the taskpool */ if (notification) { /* If the subscription is being terminated we don't need the frame hook any longer */ if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) { diff --git a/res/res_pjsip_rfc3326.c b/res/res_pjsip_rfc3326.c index e4e4e1b12f..a4992225fe 100644 --- a/res/res_pjsip_rfc3326.c +++ b/res/res_pjsip_rfc3326.c @@ -32,7 +32,7 @@ #include "asterisk/res_pjsip_session.h" #include "asterisk/module.h" #include "asterisk/causes.h" -#include "asterisk/threadpool.h" +#include "asterisk/taskpool.h" static void rfc3326_use_reason_header(struct ast_sip_session *session, struct pjsip_rx_data *rdata) { @@ -139,7 +139,7 @@ static void rfc3326_outgoing_request(struct ast_sip_session *session, struct pjs * checks so we must also be running under the call's serializer * thread. */ - || session->serializer != ast_threadpool_serializer_get_current()) { + || session->serializer != ast_taskpool_serializer_get_current()) { return; } @@ -152,7 +152,7 @@ static void rfc3326_outgoing_response(struct ast_sip_session *session, struct pj if (status.code < 300 || !session->channel - || session->serializer != ast_threadpool_serializer_get_current()) { + || session->serializer != ast_taskpool_serializer_get_current()) { return; } diff --git a/res/res_pjsip_rfc3329.c b/res/res_pjsip_rfc3329.c index f6faff2afe..d57392d4f8 100644 --- a/res/res_pjsip_rfc3329.c +++ b/res/res_pjsip_rfc3329.c @@ -32,7 +32,6 @@ #include "asterisk/res_pjsip_session.h" #include "asterisk/module.h" #include "asterisk/causes.h" -#include "asterisk/threadpool.h" /*! \brief Private data structure used with the modules's datastore */ struct rfc3329_store_data { diff --git a/res/res_pjsip_session.c b/res/res_pjsip_session.c index bca226c857..135aa19279 100644 --- a/res/res_pjsip_session.c +++ b/res/res_pjsip_session.c @@ -42,6 +42,7 @@ #include "asterisk/uuid.h" #include "asterisk/pbx.h" #include "asterisk/taskprocessor.h" +#include "asterisk/taskpool.h" #include "asterisk/causes.h" #include "asterisk/sdp_srtp.h" #include "asterisk/dsp.h" @@ -3127,115 +3128,14 @@ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, return ret_session; } -/*! \brief struct controlling the suspension of the session's serializer. */ -struct ast_sip_session_suspender { - ast_cond_t cond_suspended; - ast_cond_t cond_complete; - int suspended; - int complete; -}; - -static void sip_session_suspender_dtor(void *vdoomed) -{ - struct ast_sip_session_suspender *doomed = vdoomed; - - ast_cond_destroy(&doomed->cond_suspended); - ast_cond_destroy(&doomed->cond_complete); -} - -/*! - * \internal - * \brief Block the session serializer thread task. - * - * \param data Pushed serializer task data for suspension. - * - * \retval 0 - */ -static int sip_session_suspend_task(void *data) -{ - struct ast_sip_session_suspender *suspender = data; - - ao2_lock(suspender); - - /* Signal that the serializer task is now suspended. */ - suspender->suspended = 1; - ast_cond_signal(&suspender->cond_suspended); - - /* Wait for the serializer suspension to be completed. */ - while (!suspender->complete) { - ast_cond_wait(&suspender->cond_complete, ao2_object_get_lockaddr(suspender)); - } - - ao2_unlock(suspender); - ao2_ref(suspender, -1); - - return 0; -} - void ast_sip_session_suspend(struct ast_sip_session *session) { - struct ast_sip_session_suspender *suspender; - int res; - - ast_assert(session->suspended == NULL); - - if (ast_taskprocessor_is_task(session->serializer)) { - /* I am the session's serializer thread so I cannot suspend. */ - return; - } - - if (ast_taskprocessor_is_suspended(session->serializer)) { - /* The serializer already suspended. */ - return; - } - - suspender = ao2_alloc(sizeof(*suspender), sip_session_suspender_dtor); - if (!suspender) { - /* We will just have to hope that the system does not deadlock */ - return; - } - ast_cond_init(&suspender->cond_suspended, NULL); - ast_cond_init(&suspender->cond_complete, NULL); - - ao2_ref(suspender, +1); - res = ast_sip_push_task(session->serializer, sip_session_suspend_task, suspender); - if (res) { - /* We will just have to hope that the system does not deadlock */ - ao2_ref(suspender, -2); - return; - } - - session->suspended = suspender; - - /* Wait for the serializer to get suspended. */ - ao2_lock(suspender); - while (!suspender->suspended) { - ast_cond_wait(&suspender->cond_suspended, ao2_object_get_lockaddr(suspender)); - } - ao2_unlock(suspender); - - ast_taskprocessor_suspend(session->serializer); + ast_taskpool_serializer_suspend(session->serializer); } void ast_sip_session_unsuspend(struct ast_sip_session *session) { - struct ast_sip_session_suspender *suspender = session->suspended; - - if (!suspender) { - /* Nothing to do */ - return; - } - session->suspended = NULL; - - /* Signal that the serializer task suspension is now complete. */ - ao2_lock(suspender); - suspender->complete = 1; - ast_cond_signal(&suspender->cond_complete); - ao2_unlock(suspender); - - ao2_ref(suspender, -1); - - ast_taskprocessor_unsuspend(session->serializer); + ast_taskpool_serializer_unsuspend(session->serializer); } /*!