diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c
index 5b03f88951..ae3f58a9b0 100644
--- a/channels/chan_pjsip.c
+++ b/channels/chan_pjsip.c
@@ -720,7 +720,7 @@ static int answer(void *data)
ast_channel_name(session->channel), err);
/*
* Return this value so we can distinguish between this
- * failure and the threadpool synchronous push failing.
+ * failure and the taskpool synchronous push failing.
*/
SCOPE_EXIT_RTN_VALUE(-2, "pjproject failure\n");
}
@@ -753,7 +753,7 @@ static int chan_pjsip_answer(struct ast_channel *ast)
res = ast_sip_push_task_wait_serializer(session->serializer, answer, &ans_data);
if (res) {
if (res == -1) {
- ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n",
+ ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the taskpool.\n",
ast_channel_name(session->channel));
}
ao2_ref(session, -1);
@@ -2599,7 +2599,7 @@ static int chan_pjsip_hangup(struct ast_channel *ast)
}
if (ast_sip_push_task(channel->session->serializer, hangup, h_data)) {
- ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n");
+ ast_log(LOG_WARNING, "Unable to push hangup task to the taskpool. Expect bad things\n");
goto failure;
}
diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample
index c399ba8a24..f3619909b6 100644
--- a/configs/samples/pjsip.conf.sample
+++ b/configs/samples/pjsip.conf.sample
@@ -1287,14 +1287,16 @@
;timer_b=32000 ; Set transaction timer B value milliseconds (default: "32000")
;compact_headers=no ; Use the short forms of common SIP header names
; (default: "no")
-;threadpool_initial_size=0 ; Initial number of threads in the res_pjsip
- ; threadpool (default: "0")
-;threadpool_auto_increment=5 ; The amount by which the number of threads is
- ; incremented when necessary (default: "5")
-;threadpool_idle_timeout=60 ; Number of seconds before an idle thread
- ; should be disposed of (default: "60")
-;threadpool_max_size=0 ; Maximum number of threads in the res_pjsip threadpool
- ; A value of 0 indicates no maximum (default: "0")
+;taskpool_minimum_size=4 ; Minimum number of taskprocessors in the res_pjsip
+ ; taskpool (default: "4")
+;taskpool_initial_size=4 ; Initial number of taskprocessors in the res_pjsip
+ ; taskpool (default: "4")
+;taskpool_auto_increment=1 ; The amount by which the number of taskprocessors is
+ ; incremented when necessary (default: "1")
+;taskpool_idle_timeout=60 ; Number of seconds before an idle taskprocessor
+ ; should be disposed of (default: "60")
+;taskpool_max_size=50 ; Maximum number of taskprocessors in the res_pjsip taskpool
+ ; A value of 0 indicates no maximum (default: "50")
;disable_tcp_switch=yes ; Disable automatic switching from UDP to TCP transports
; if outgoing request is too large.
; See RFC 3261 section 18.1.1.
diff --git a/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py b/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py
new file mode 100644
index 0000000000..b07c966bd2
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py
@@ -0,0 +1,29 @@
+"""add taskpool options to system
+
+Revision ID: dc7c357dc178
+Revises: abdc9ede147d
+Create Date: 2025-09-24 09:45:17.609185
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'dc7c357dc178'
+down_revision = 'abdc9ede147d'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column('ps_systems', sa.Column('taskpool_minimum_size', sa.Integer))
+ op.add_column('ps_systems', sa.Column('taskpool_initial_size', sa.Integer))
+ op.add_column('ps_systems', sa.Column('taskpool_auto_increment', sa.Integer))
+ op.add_column('ps_systems', sa.Column('taskpool_idle_timeout', sa.Integer))
+ op.add_column('ps_systems', sa.Column('taskpool_max_size', sa.Integer))
+
+def downgrade():
+ op.drop_column('ps_systems', 'taskpool_minimum_size')
+ op.drop_column('ps_systems', 'taskpool_initial_size')
+ op.drop_column('ps_systems', 'taskpool_auto_increment')
+ op.drop_column('ps_systems', 'taskpool_idle_timeout')
+ op.drop_column('ps_systems', 'taskpool_max_size')
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 2fafd5790a..c49e9dfac7 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -1921,7 +1921,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* There are three major types of threads that SIP will have to deal with:
* \li Asterisk threads
* \li PJSIP threads
- * \li SIP threadpool threads (a.k.a. "servants")
+ * \li SIP taskpool threads (a.k.a. "servants")
*
* \par Asterisk Threads
*
@@ -1963,7 +1963,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* is NULL, then the work will be handed off to whatever servant can currently handle
* the task. If this pointer is non-NULL, then the task will not be executed until
* previous tasks pushed with the same serializer have completed. For more information
- * on serializers and the benefits they provide, see \ref ast_threadpool_serializer
+ * on serializers and the benefits they provide, see \ref ast_taskpool_serializer
*
* \par Scheduler
*
@@ -1992,7 +1992,7 @@ typedef int (*ast_sip_task)(void *user_data);
* \brief Create a new serializer for SIP tasks
* \since 13.8.0
*
- * See \ref ast_threadpool_serializer for more information on serializers.
+ * See \ref ast_taskpool_serializer for more information on serializers.
* SIP creates serializers so that tasks operating on similar data will run
* in sequence.
*
@@ -2009,7 +2009,7 @@ struct ast_serializer_shutdown_group;
* \brief Create a new serializer for SIP tasks
* \since 13.8.0
*
- * See \ref ast_threadpool_serializer for more information on serializers.
+ * See \ref ast_taskpool_serializer for more information on serializers.
* SIP creates serializers so that tasks operating on similar data will run
* in sequence.
*
@@ -2251,7 +2251,7 @@ enum ast_sip_scheduler_task_flags {
struct ast_sip_sched_task;
/*!
- * \brief Schedule a task to run in the res_pjsip thread pool
+ * \brief Schedule a task to run in the res_pjsip taskpool
* \since 13.9.0
*
* \param serializer The serializer to use. If NULL, don't use a serializer (see note below)
@@ -2266,7 +2266,7 @@ struct ast_sip_sched_task;
* \par Serialization
*
* Specifying a serializer guarantees serialized execution but NOT specifying a serializer
- * may still result in tasks being effectively serialized if the thread pool is busy.
+ * may still result in tasks being effectively serialized if the taskpool is busy.
* The point of the serializer BTW is not to prevent parallel executions of the SAME task.
* That happens automatically (see below). It's to prevent the task from running at the same
* time as other work using the same serializer, whether or not it's being run by the scheduler.
@@ -3662,15 +3662,15 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr);
const char *ast_sip_get_host_ip_string(int af);
/*!
- * \brief Return the size of the SIP threadpool's task queue
+ * \brief Return the size of the SIP taskpool's task queue
* \since 13.7.0
*/
-long ast_sip_threadpool_queue_size(void);
+long ast_sip_taskpool_queue_size(void);
/*!
- * \brief Retrieve the SIP threadpool object
+ * \brief Retrieve the SIP taskpool object
*/
-struct ast_threadpool *ast_sip_threadpool(void);
+struct ast_taskpool *ast_sip_taskpool(void);
/*!
* \brief Retrieve transport state
diff --git a/include/asterisk/res_pjsip_session.h b/include/asterisk/res_pjsip_session.h
index 75ba60e523..7279d0d448 100644
--- a/include/asterisk/res_pjsip_session.h
+++ b/include/asterisk/res_pjsip_session.h
@@ -195,7 +195,7 @@ struct ast_sip_session {
struct ao2_container *datastores;
/*! Serializer for tasks relating to this SIP session */
struct ast_taskprocessor *serializer;
- /*! Non-null if the session serializer is suspended or being suspended. */
+ /*! \deprecated Non-null if the session serializer is suspended or being suspended. */
struct ast_sip_session_suspender *suspended;
/*! Requests that could not be sent due to current inv_session state */
AST_LIST_HEAD_NOLOCK(, ast_sip_session_delayed_request) delayed_requests;
diff --git a/include/asterisk/taskpool.h b/include/asterisk/taskpool.h
index 2a4f963052..bf1c1901eb 100644
--- a/include/asterisk/taskpool.h
+++ b/include/asterisk/taskpool.h
@@ -318,4 +318,24 @@ struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name,
*/
int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data);
+/*!
+ * \brief Suspend a serializer, causing tasks to be queued until unsuspended
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param serializer The serializer to suspend
+ */
+void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer);
+
+/*!
+ * \brief Unsuspend a serializer, causing tasks to be executed
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param serializer The serializer to unsuspend
+ */
+void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer);
+
#endif /* ASTERISK_TASKPOOL_H */
diff --git a/main/taskpool.c b/main/taskpool.c
index 59ac4b0c72..987ad3776f 100644
--- a/main/taskpool.c
+++ b/main/taskpool.c
@@ -676,6 +676,8 @@ struct serializer {
struct ast_taskpool *pool;
/*! Which group will wait for this serializer to shutdown. */
struct ast_serializer_shutdown_group *shutdown_group;
+ /*! Whether the serializer is suspended or not. */
+ unsigned int suspended:1;
};
static void serializer_dtor(void *obj)
@@ -727,6 +729,15 @@ static int execute_tasks(void *data)
ast_threadstorage_set_ptr(¤t_taskpool_serializer, tps);
for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
requeue = ast_taskprocessor_execute(tps);
+
+ /* If the serializer is suspended we will not execute any more tasks and
+ * we will not requeue the taskpool task. Instead it will be requeued when
+ * the serializer is unsuspended.
+ */
+ if (ser->suspended) {
+ requeue = 0;
+ break;
+ }
}
ast_threadstorage_set_ptr(¤t_taskpool_serializer, NULL);
@@ -916,6 +927,72 @@ int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int
return sync_task.fail;
}
+/*!
+ * \internal A task that suspends the serializer after queuing an empty task
+ */
+static int taskpool_serializer_suspend_task(void *data)
+{
+ struct ast_taskprocessor *serializer = data;
+ struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+
+ /* If already suspended this is a no-op */
+ if (ser->suspended) {
+ return 0;
+ }
+
+ /* First we queue the empty task to ensure the serializer doesn't reach empty, this
+ * prevents any threads from queueing up a taskpool task that executes the serializer
+ * while it is suspended, allowing us to queue it ourselves when the serializer is
+ * unsuspended.
+ */
+ if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) {
+ return 0;
+ }
+
+ /* Next we suspend the serializer so that the execute_tasks currently executing stops
+ * and doesn't requeue.
+ */
+ ser->suspended = 1;
+
+ return 0;
+ }
+
+void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer)
+{
+ if (ast_taskprocessor_is_task(serializer)) {
+ /* I am the session's serializer thread so I cannot suspend. */
+ return;
+ }
+
+ /* Once this returns there is no thread executing the tasks on the serializer, so they
+ * will accumulate until the serializer is unsuspended.
+ */
+ ast_taskpool_serializer_push_wait(serializer, taskpool_serializer_suspend_task, serializer);
+}
+
+void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer)
+{
+ struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+
+ ao2_lock(ser);
+
+ if (!ser->suspended) {
+ ao2_unlock(ser);
+ return;
+ }
+
+ ser->suspended = 0;
+
+ ao2_unlock(ser);
+
+ /* And now we kick off handling of the queued tasks once again */
+ if (ast_taskpool_push(ser->pool, execute_tasks, ao2_bump(serializer))) {
+ ast_taskprocessor_unreference(serializer);
+ }
+}
+
/*!
* \internal
* \brief Clean up resources on Asterisk shutdown
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 5b954b2226..e3e1c59852 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -37,7 +37,7 @@
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
#include "asterisk/serializer.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/uuid.h"
#include "asterisk/sorcery.h"
@@ -65,15 +65,9 @@
#define MOD_DATA_CONTACT "contact"
-/*! Number of serializers in pool if one not supplied. */
-#define SERIALIZER_POOL_SIZE 8
-
-/*! Pool of serializers to use if not supplied. */
-static struct ast_serializer_pool *sip_serializer_pool;
-
static pjsip_endpoint *ast_pjsip_endpoint;
-static struct ast_threadpool *sip_threadpool;
+static struct ast_taskpool *sip_taskpool;
/*! Local host address for IPv4 */
static pj_sockaddr host_ip_ipv4;
@@ -2088,7 +2082,7 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text)
struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group)
{
- return ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group);
+ return ast_taskpool_serializer_group(name, sip_taskpool, shutdown_group);
}
struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
@@ -2099,67 +2093,18 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
if (!serializer) {
- serializer = ast_serializer_pool_get(sip_serializer_pool);
+ return ast_taskpool_push(sip_taskpool, sip_task, task_data);
}
return ast_taskprocessor_push(serializer, sip_task, task_data);
}
-struct sync_task_data {
- ast_mutex_t lock;
- ast_cond_t cond;
- int complete;
- int fail;
- int (*task)(void *);
- void *task_data;
-};
-
-static int sync_task(void *data)
-{
- struct sync_task_data *std = data;
- int ret;
-
- std->fail = std->task(std->task_data);
-
- /*
- * Once we unlock std->lock after signaling, we cannot access
- * std again. The thread waiting within ast_sip_push_task_wait()
- * is free to continue and release its local variable (std).
- */
- ast_mutex_lock(&std->lock);
- std->complete = 1;
- ast_cond_signal(&std->cond);
- ret = std->fail;
- ast_mutex_unlock(&std->lock);
- return ret;
-}
-
static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
- /* This method is an onion */
- struct sync_task_data std;
-
- memset(&std, 0, sizeof(std));
- ast_mutex_init(&std.lock);
- ast_cond_init(&std.cond, NULL);
- std.task = sip_task;
- std.task_data = task_data;
-
- if (ast_sip_push_task(serializer, sync_task, &std)) {
- ast_mutex_destroy(&std.lock);
- ast_cond_destroy(&std.cond);
- return -1;
+ if (!serializer) {
+ return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data);
}
-
- ast_mutex_lock(&std.lock);
- while (!std.complete) {
- ast_cond_wait(&std.cond, &std.lock);
- }
- ast_mutex_unlock(&std.lock);
-
- ast_mutex_destroy(&std.lock);
- ast_cond_destroy(&std.cond);
- return std.fail;
+ return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data);
}
int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
@@ -2179,23 +2124,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
if (!serializer) {
- /* Caller doesn't care which PJSIP serializer the task executes under. */
- serializer = ast_serializer_pool_get(sip_serializer_pool);
- if (!serializer) {
- /* No serializer picked to execute the task */
- return -1;
- }
- }
- if (ast_taskprocessor_is_task(serializer)) {
- /*
- * We are the requested serializer so we must execute
- * the task now or deadlock waiting on ourself to
- * execute it.
- */
- return sip_task(task_data);
+ return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data);
}
- return ast_sip_push_task_wait(serializer, sip_task, task_data);
+ return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data);
}
void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size)
@@ -3454,14 +3386,14 @@ static void remove_request_headers(pjsip_endpoint *endpt)
}
}
-long ast_sip_threadpool_queue_size(void)
+long ast_sip_taskpool_queue_size(void)
{
- return ast_threadpool_queue_size(sip_threadpool);
+ return ast_taskpool_queue_size(sip_taskpool);
}
-struct ast_threadpool *ast_sip_threadpool(void)
+struct ast_taskpool *ast_sip_taskpool(void)
{
- return sip_threadpool;
+ return sip_taskpool;
}
int ast_sip_is_uri_sip_sips(pjsip_uri *uri)
@@ -3801,7 +3733,7 @@ static int unload_pjsip(void *data)
* These calls need the pjsip endpoint and serializer to clean up.
* If they're not set, then there's nothing to clean up anyway.
*/
- if (ast_pjsip_endpoint && sip_serializer_pool) {
+ if (ast_pjsip_endpoint) {
ast_res_pjsip_cleanup_options_handling();
ast_res_pjsip_cleanup_message_filter();
ast_sip_destroy_distributor();
@@ -3921,7 +3853,7 @@ pjsip_media_type pjsip_media_type_text_plain;
static int load_module(void)
{
- struct ast_threadpool_options options;
+ struct ast_taskpool_options options;
/* pjproject and config_system need to be initialized before all else */
if (pj_init() != PJ_SUCCESS) {
@@ -3958,18 +3890,11 @@ static int load_module(void)
goto error;
}
- /* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
- sip_get_threadpool_options(&options);
+ /* The serializer needs taskpool and taskpool needs pjproject to be initialized so it's next */
+ sip_get_taskpool_options(&options);
options.thread_start = sip_thread_start;
- sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
- if (!sip_threadpool) {
- goto error;
- }
-
- sip_serializer_pool = ast_serializer_pool_create(
- "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1);
- if (!sip_serializer_pool) {
- ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n");
+ sip_taskpool = ast_taskpool_create("pjsip", &options);
+ if (!sip_taskpool) {
goto error;
}
@@ -4049,8 +3974,7 @@ error:
/* These functions all check for NULLs and are safe to call at any time */
ast_sip_destroy_scheduler();
- ast_serializer_pool_destroy(sip_serializer_pool);
- ast_threadpool_shutdown(sip_threadpool);
+ ast_taskpool_shutdown(sip_taskpool);
return AST_MODULE_LOAD_DECLINE;
}
@@ -4076,12 +4000,11 @@ static int unload_module(void)
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
/* The thread this is called from cannot call PJSIP/PJLIB functions,
- * so we have to push the work to the threadpool to handle
+ * so we have to push the work to the taskpool to handle
*/
ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL);
ast_sip_destroy_scheduler();
- ast_serializer_pool_destroy(sip_serializer_pool);
- ast_threadpool_shutdown(sip_threadpool);
+ ast_taskpool_shutdown(sip_taskpool);
return 0;
}
diff --git a/res/res_pjsip/config_system.c b/res/res_pjsip/config_system.c
index e16738f610..c782c6f28e 100644
--- a/res/res_pjsip/config_system.c
+++ b/res/res_pjsip/config_system.c
@@ -24,7 +24,7 @@
#include "asterisk/res_pjsip.h"
#include "asterisk/sorcery.h"
#include "include/res_pjsip_private.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
#include "asterisk/dns.h"
#include "asterisk/res_pjsip_cli.h"
@@ -41,15 +41,17 @@ struct system_config {
/*! Should we use short forms for headers? */
unsigned int compactheaders;
struct {
- /*! Initial number of threads in the threadpool */
+ /*! Minimum number of taskprocessors in the taskpool */
+ int minimum_size;
+ /*! Initial number of taskprocessors in the taskpool */
int initial_size;
- /*! The amount by which the number of threads is incremented when necessary */
+ /*! The amount by which the number of taskprocessors is incremented when necessary */
int auto_increment;
- /*! Thread idle timeout in seconds */
+ /*! Taskprocessor idle timeout in seconds */
int idle_timeout;
- /*! Maxumum number of threads in the threadpool */
+ /*! Maxumum number of taskprocessors in the taskpool */
int max_size;
- } threadpool;
+ } taskpool;
/*! Nonzero to disable switching from UDP to TCP transport */
unsigned int disable_tcp_switch;
/*!
@@ -63,13 +65,13 @@ struct system_config {
unsigned int disable_rport;
};
-static struct ast_threadpool_options sip_threadpool_options = {
- .version = AST_THREADPOOL_OPTIONS_VERSION,
+static struct ast_taskpool_options sip_taskpool_options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
};
-void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options)
+void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options)
{
- *threadpool_options = sip_threadpool_options;
+ *taskpool_options = sip_taskpool_options;
}
static struct ast_sorcery *system_sorcery;
@@ -125,10 +127,11 @@ static int system_apply(const struct ast_sorcery *sorcery, void *obj)
#endif
}
- sip_threadpool_options.initial_size = system->threadpool.initial_size;
- sip_threadpool_options.auto_increment = system->threadpool.auto_increment;
- sip_threadpool_options.idle_timeout = system->threadpool.idle_timeout;
- sip_threadpool_options.max_size = system->threadpool.max_size;
+ sip_taskpool_options.minimum_size = system->taskpool.minimum_size;
+ sip_taskpool_options.initial_size = system->taskpool.initial_size;
+ sip_taskpool_options.auto_increment = system->taskpool.auto_increment;
+ sip_taskpool_options.idle_timeout = system->taskpool.idle_timeout;
+ sip_taskpool_options.max_size = system->taskpool.max_size;
pjsip_cfg()->endpt.disable_tcp_switch =
system->disable_tcp_switch ? PJ_TRUE : PJ_FALSE;
@@ -199,14 +202,24 @@ int ast_sip_initialize_system(void)
OPT_UINT_T, 0, FLDSET(struct system_config, timerb));
ast_sorcery_object_field_register(system_sorcery, "system", "compact_headers", "no",
OPT_BOOL_T, 1, FLDSET(struct system_config, compactheaders));
- ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "0",
- OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.initial_size));
- ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "5",
- OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.auto_increment));
+ ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_minimum_size", "4",
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.minimum_size));
+ ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_initial_size", "4",
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
+ ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "4",
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
+ ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_auto_increment", "1",
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
+ ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "1",
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
+ ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_idle_timeout", "60",
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_idle_timeout", "60",
- OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.idle_timeout));
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
+ ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_max_size", "50",
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_max_size", "50",
- OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.max_size));
+ OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
ast_sorcery_object_field_register(system_sorcery, "system", "disable_tcp_switch", "yes",
OPT_BOOL_T, 1, FLDSET(struct system_config, disable_tcp_switch));
ast_sorcery_object_field_register(system_sorcery, "system", "follow_early_media_fork", "yes",
diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h
index dcb821cedc..11582ad1c4 100644
--- a/res/res_pjsip/include/res_pjsip_private.h
+++ b/res/res_pjsip/include/res_pjsip_private.h
@@ -18,7 +18,7 @@
#include "asterisk/compat.h"
struct ao2_container;
-struct ast_threadpool_options;
+struct ast_taskpool_options;
struct ast_sip_cli_context;
/*!
@@ -116,7 +116,7 @@ int ast_sip_destroy_sorcery_auth(void);
* \brief Initialize the distributor module
*
* The distributor module is responsible for taking an incoming
- * SIP message and placing it into the threadpool. Once in the threadpool,
+ * SIP message and placing it into the taskpool. Once in the taskpool,
* the distributor will perform endpoint lookups and authentication, and
* then distribute the message up the stack to any further modules.
*
@@ -278,9 +278,9 @@ void ast_res_pjsip_cleanup_message_filter(void);
/*!
* \internal
- * \brief Get threadpool options
+ * \brief Get taskpool options
*/
-void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options);
+void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options);
/*!
* \internal
diff --git a/res/res_pjsip/pjsip_config.xml b/res/res_pjsip/pjsip_config.xml
index cef061b84c..8019426da7 100644
--- a/res/res_pjsip/pjsip_config.xml
+++ b/res/res_pjsip/pjsip_config.xml
@@ -3009,30 +3009,75 @@
Use the short forms of common SIP header names.
-
+
- 12.0.0
+ 23.1.0
+ 22.7.0
+ 20.17.0
- Initial number of threads in the res_pjsip threadpool.
+ Minimum number of taskprocessors in the res_pjsip taskpool.
-
+
+
+ 23.1.0
+ 22.7.0
+ 20.17.0
+
+ Initial number of taskprocessors in the res_pjsip taskpool.
+
+
+
+ 23.1.0
+ 22.7.0
+ 20.17.0
+
+ The amount by which the number of taskprocessors is incremented when necessary.
+
+
+
+ 23.1.0
+ 22.7.0
+ 20.17.0
+
+ Number of seconds before an idle taskprocessor should be disposed of.
+
+
+
+ 23.1.0
+ 22.7.0
+ 20.17.0
+
+ Maximum number of taskprocessors in the res_pjsip taskpool.
+ A value of 0 indicates no maximum.
+
+
12.0.0
- The amount by which the number of threads is incremented when necessary.
+ Initial number of threads in the res_pjsip taskpool.
+ Deprecated in favor of taskpool_initiali_size.
+
+
+
+ 12.0.0
+
+ The amount by which the number of threads is incremented when necessary.
+ Deprecated in favor of taskpool_auto_increment.
12.0.0
- Number of seconds before an idle thread should be disposed of.
+ Number of seconds before an idle taskprocessor should be disposed of.
+ Deprecated in favor of taskpool_idle_timeout.
-
+
13.7.0
- Maximum number of threads in the res_pjsip threadpool.
- A value of 0 indicates no maximum.
+ Maximum number of taskprocessors in the res_pjsip taskpool.
+ A value of 0 indicates no maximum.
+ Deprecated in favor of taskpool_max_size.
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index c960611101..adf18d3b8b 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -24,7 +24,7 @@
#include "asterisk/acl.h"
#include "include/res_pjsip_private.h"
#include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
#include "asterisk/res_pjsip_cli.h"
static int distribute(void *data);
@@ -75,7 +75,7 @@ static pj_status_t record_serializer(pjsip_tx_data *tdata)
{
struct ast_taskprocessor *serializer;
- serializer = ast_threadpool_serializer_get_current();
+ serializer = ast_taskpool_serializer_get_current();
if (serializer) {
const char *name;
diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c
index 93354b9309..1d75c903ed 100644
--- a/res/res_pjsip/pjsip_options.c
+++ b/res/res_pjsip/pjsip_options.c
@@ -33,7 +33,7 @@
#include "asterisk/statsd.h"
#include "include/res_pjsip_private.h"
#include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/serializer_shutdown_group.h"
/*
* This implementation for OPTIONS support is based around the idea
diff --git a/res/res_pjsip/pjsip_resolver.c b/res/res_pjsip/pjsip_resolver.c
index 2babfe40de..f5f4a1cfc5 100644
--- a/res/res_pjsip/pjsip_resolver.c
+++ b/res/res_pjsip/pjsip_resolver.c
@@ -31,7 +31,7 @@
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
#ifdef HAVE_PJSIP_EXTERNAL_RESOLVER
@@ -611,7 +611,7 @@ static void sip_resolve(pjsip_resolver_t *resolver, pj_pool_t *pool, const pjsip
return;
}
- resolve->serializer = ao2_bump(ast_threadpool_serializer_get_current());
+ resolve->serializer = ao2_bump(ast_taskpool_serializer_get_current());
ast_debug(2, "[%p] Starting initial resolution using parallel queries for target '%s'\n", resolve, host);
ast_dns_query_set_resolve_async(resolve->queries, sip_resolve_callback, resolve);
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index 072ddce5cc..22c7a1f345 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -1573,8 +1573,8 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
- mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi",
- MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME);
+ mwi_serializer_pool = ast_serializer_taskpool_create("pjsip/mwi",
+ MWI_SERIALIZER_POOL_SIZE, ast_sip_taskpool(), MAX_UNLOAD_TIMEOUT_TIME);
if (!mwi_serializer_pool) {
ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
}
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c
index 4b47f2b33f..538df56125 100644
--- a/res/res_pjsip_outbound_publish.c
+++ b/res/res_pjsip_outbound_publish.c
@@ -33,7 +33,8 @@
#include "asterisk/res_pjsip_outbound_publish.h"
#include "asterisk/module.h"
#include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
+#include "asterisk/serializer_shutdown_group.h"
#include "asterisk/datastore.h"
#include "res_pjsip/include/res_pjsip_private.h"
diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c
index 75744affa9..2746eeaea9 100644
--- a/res/res_pjsip_outbound_registration.c
+++ b/res/res_pjsip_outbound_registration.c
@@ -35,7 +35,8 @@
#include "asterisk/cli.h"
#include "asterisk/stasis_system.h"
#include "asterisk/threadstorage.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
+#include "asterisk/serializer_shutdown_group.h"
#include "asterisk/statsd.h"
#include "res_pjsip/include/res_pjsip_private.h"
#include "asterisk/vector.h"
@@ -1669,7 +1670,7 @@ static void sip_outbound_registration_response_cb(struct pjsip_regc_cbparam *par
* pjproject callback thread.
*/
if (ast_sip_push_task(client_state->serializer, handle_registration_response, response)) {
- ast_log(LOG_WARNING, "Failed to pass incoming registration response to threadpool\n");
+ ast_log(LOG_WARNING, "Failed to pass incoming registration response to taskpool\n");
ao2_cleanup(response);
}
}
@@ -1690,7 +1691,7 @@ static void sip_outbound_registration_state_destroy(void *obj)
ao2_ref(state->client_state, -1);
} else if (ast_sip_push_task(state->client_state->serializer,
handle_client_state_destruction, state->client_state)) {
- ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to threadpool\n");
+ ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to taskpool\n");
ao2_ref(state->client_state, -1);
}
}
diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c
index 6616b1c0c9..67d7836ce7 100644
--- a/res/res_pjsip_refer.c
+++ b/res/res_pjsip_refer.c
@@ -343,7 +343,7 @@ static struct ast_frame *refer_progress_framehook(struct ast_channel *chan, stru
}
}
- /* If a notification is due to be sent push it to the thread pool */
+ /* If a notification is due to be sent push it to the taskpool */
if (notification) {
/* If the subscription is being terminated we don't need the frame hook any longer */
if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) {
@@ -395,7 +395,7 @@ static struct ast_frame *refer_ari_progress_framehook(struct ast_channel *chan,
progress->ari_state->last_response = *message;
}
- /* If a notification is due to be sent push it to the thread pool */
+ /* If a notification is due to be sent push it to the taskpool */
if (notification) {
/* If the subscription is being terminated we don't need the frame hook any longer */
if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) {
diff --git a/res/res_pjsip_rfc3326.c b/res/res_pjsip_rfc3326.c
index e4e4e1b12f..a4992225fe 100644
--- a/res/res_pjsip_rfc3326.c
+++ b/res/res_pjsip_rfc3326.c
@@ -32,7 +32,7 @@
#include "asterisk/res_pjsip_session.h"
#include "asterisk/module.h"
#include "asterisk/causes.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
static void rfc3326_use_reason_header(struct ast_sip_session *session, struct pjsip_rx_data *rdata)
{
@@ -139,7 +139,7 @@ static void rfc3326_outgoing_request(struct ast_sip_session *session, struct pjs
* checks so we must also be running under the call's serializer
* thread.
*/
- || session->serializer != ast_threadpool_serializer_get_current()) {
+ || session->serializer != ast_taskpool_serializer_get_current()) {
return;
}
@@ -152,7 +152,7 @@ static void rfc3326_outgoing_response(struct ast_sip_session *session, struct pj
if (status.code < 300
|| !session->channel
- || session->serializer != ast_threadpool_serializer_get_current()) {
+ || session->serializer != ast_taskpool_serializer_get_current()) {
return;
}
diff --git a/res/res_pjsip_rfc3329.c b/res/res_pjsip_rfc3329.c
index f6faff2afe..d57392d4f8 100644
--- a/res/res_pjsip_rfc3329.c
+++ b/res/res_pjsip_rfc3329.c
@@ -32,7 +32,6 @@
#include "asterisk/res_pjsip_session.h"
#include "asterisk/module.h"
#include "asterisk/causes.h"
-#include "asterisk/threadpool.h"
/*! \brief Private data structure used with the modules's datastore */
struct rfc3329_store_data {
diff --git a/res/res_pjsip_session.c b/res/res_pjsip_session.c
index bca226c857..135aa19279 100644
--- a/res/res_pjsip_session.c
+++ b/res/res_pjsip_session.c
@@ -42,6 +42,7 @@
#include "asterisk/uuid.h"
#include "asterisk/pbx.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/taskpool.h"
#include "asterisk/causes.h"
#include "asterisk/sdp_srtp.h"
#include "asterisk/dsp.h"
@@ -3127,115 +3128,14 @@ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint,
return ret_session;
}
-/*! \brief struct controlling the suspension of the session's serializer. */
-struct ast_sip_session_suspender {
- ast_cond_t cond_suspended;
- ast_cond_t cond_complete;
- int suspended;
- int complete;
-};
-
-static void sip_session_suspender_dtor(void *vdoomed)
-{
- struct ast_sip_session_suspender *doomed = vdoomed;
-
- ast_cond_destroy(&doomed->cond_suspended);
- ast_cond_destroy(&doomed->cond_complete);
-}
-
-/*!
- * \internal
- * \brief Block the session serializer thread task.
- *
- * \param data Pushed serializer task data for suspension.
- *
- * \retval 0
- */
-static int sip_session_suspend_task(void *data)
-{
- struct ast_sip_session_suspender *suspender = data;
-
- ao2_lock(suspender);
-
- /* Signal that the serializer task is now suspended. */
- suspender->suspended = 1;
- ast_cond_signal(&suspender->cond_suspended);
-
- /* Wait for the serializer suspension to be completed. */
- while (!suspender->complete) {
- ast_cond_wait(&suspender->cond_complete, ao2_object_get_lockaddr(suspender));
- }
-
- ao2_unlock(suspender);
- ao2_ref(suspender, -1);
-
- return 0;
-}
-
void ast_sip_session_suspend(struct ast_sip_session *session)
{
- struct ast_sip_session_suspender *suspender;
- int res;
-
- ast_assert(session->suspended == NULL);
-
- if (ast_taskprocessor_is_task(session->serializer)) {
- /* I am the session's serializer thread so I cannot suspend. */
- return;
- }
-
- if (ast_taskprocessor_is_suspended(session->serializer)) {
- /* The serializer already suspended. */
- return;
- }
-
- suspender = ao2_alloc(sizeof(*suspender), sip_session_suspender_dtor);
- if (!suspender) {
- /* We will just have to hope that the system does not deadlock */
- return;
- }
- ast_cond_init(&suspender->cond_suspended, NULL);
- ast_cond_init(&suspender->cond_complete, NULL);
-
- ao2_ref(suspender, +1);
- res = ast_sip_push_task(session->serializer, sip_session_suspend_task, suspender);
- if (res) {
- /* We will just have to hope that the system does not deadlock */
- ao2_ref(suspender, -2);
- return;
- }
-
- session->suspended = suspender;
-
- /* Wait for the serializer to get suspended. */
- ao2_lock(suspender);
- while (!suspender->suspended) {
- ast_cond_wait(&suspender->cond_suspended, ao2_object_get_lockaddr(suspender));
- }
- ao2_unlock(suspender);
-
- ast_taskprocessor_suspend(session->serializer);
+ ast_taskpool_serializer_suspend(session->serializer);
}
void ast_sip_session_unsuspend(struct ast_sip_session *session)
{
- struct ast_sip_session_suspender *suspender = session->suspended;
-
- if (!suspender) {
- /* Nothing to do */
- return;
- }
- session->suspended = NULL;
-
- /* Signal that the serializer task suspension is now complete. */
- ao2_lock(suspender);
- suspender->complete = 1;
- ast_cond_signal(&suspender->cond_complete);
- ao2_unlock(suspender);
-
- ao2_ref(suspender, -1);
-
- ast_taskprocessor_unsuspend(session->serializer);
+ ast_taskpool_serializer_unsuspend(session->serializer);
}
/*!