pjsip: Move from threadpool to taskpool

This change moves the PJSIP module from the threadpool API
to the taskpool API. PJSIP-specific implementations for
task usage have been removed and replaced with calls to
the optimized taskpool implementations instead. The need
for a pool of serializers has also been removed as
taskpool inherently provides this. The default settings
have also been changed to be more realistic for common
usage.

UpgradeNote: The threadpool_* options in pjsip.conf have now
been deprecated though they continue to be read and used.
They have been replaced with taskpool options that give greater
control over the underlying taskpool used for PJSIP. An alembic
upgrade script has been added to add these options to realtime
as well.
This commit is contained in:
Joshua C. Colp
2025-09-23 18:54:22 -03:00
parent 74a3809be8
commit 2d28889193
21 changed files with 284 additions and 274 deletions

View File

@@ -24,7 +24,7 @@
#include "asterisk/res_pjsip.h"
#include "asterisk/sorcery.h"
#include "include/res_pjsip_private.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskpool.h"
#include "asterisk/dns.h"
#include "asterisk/res_pjsip_cli.h"
@@ -41,15 +41,17 @@ struct system_config {
/*! Should we use short forms for headers? */
unsigned int compactheaders;
struct {
/*! Initial number of threads in the threadpool */
/*! Minimum number of taskprocessors in the taskpool */
int minimum_size;
/*! Initial number of taskprocessors in the taskpool */
int initial_size;
/*! The amount by which the number of threads is incremented when necessary */
/*! The amount by which the number of taskprocessors is incremented when necessary */
int auto_increment;
/*! Thread idle timeout in seconds */
/*! Taskprocessor idle timeout in seconds */
int idle_timeout;
/*! Maxumum number of threads in the threadpool */
/*! Maxumum number of taskprocessors in the taskpool */
int max_size;
} threadpool;
} taskpool;
/*! Nonzero to disable switching from UDP to TCP transport */
unsigned int disable_tcp_switch;
/*!
@@ -63,13 +65,13 @@ struct system_config {
unsigned int disable_rport;
};
static struct ast_threadpool_options sip_threadpool_options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
static struct ast_taskpool_options sip_taskpool_options = {
.version = AST_TASKPOOL_OPTIONS_VERSION,
};
void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options)
void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options)
{
*threadpool_options = sip_threadpool_options;
*taskpool_options = sip_taskpool_options;
}
static struct ast_sorcery *system_sorcery;
@@ -125,10 +127,11 @@ static int system_apply(const struct ast_sorcery *sorcery, void *obj)
#endif
}
sip_threadpool_options.initial_size = system->threadpool.initial_size;
sip_threadpool_options.auto_increment = system->threadpool.auto_increment;
sip_threadpool_options.idle_timeout = system->threadpool.idle_timeout;
sip_threadpool_options.max_size = system->threadpool.max_size;
sip_taskpool_options.minimum_size = system->taskpool.minimum_size;
sip_taskpool_options.initial_size = system->taskpool.initial_size;
sip_taskpool_options.auto_increment = system->taskpool.auto_increment;
sip_taskpool_options.idle_timeout = system->taskpool.idle_timeout;
sip_taskpool_options.max_size = system->taskpool.max_size;
pjsip_cfg()->endpt.disable_tcp_switch =
system->disable_tcp_switch ? PJ_TRUE : PJ_FALSE;
@@ -199,14 +202,24 @@ int ast_sip_initialize_system(void)
OPT_UINT_T, 0, FLDSET(struct system_config, timerb));
ast_sorcery_object_field_register(system_sorcery, "system", "compact_headers", "no",
OPT_BOOL_T, 1, FLDSET(struct system_config, compactheaders));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "0",
OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.initial_size));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "5",
OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.auto_increment));
ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_minimum_size", "4",
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.minimum_size));
ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_initial_size", "4",
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "4",
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_auto_increment", "1",
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "1",
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_idle_timeout", "60",
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_idle_timeout", "60",
OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.idle_timeout));
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_max_size", "50",
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_max_size", "50",
OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.max_size));
OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
ast_sorcery_object_field_register(system_sorcery, "system", "disable_tcp_switch", "yes",
OPT_BOOL_T, 1, FLDSET(struct system_config, disable_tcp_switch));
ast_sorcery_object_field_register(system_sorcery, "system", "follow_early_media_fork", "yes",

View File

@@ -18,7 +18,7 @@
#include "asterisk/compat.h"
struct ao2_container;
struct ast_threadpool_options;
struct ast_taskpool_options;
struct ast_sip_cli_context;
/*!
@@ -116,7 +116,7 @@ int ast_sip_destroy_sorcery_auth(void);
* \brief Initialize the distributor module
*
* The distributor module is responsible for taking an incoming
* SIP message and placing it into the threadpool. Once in the threadpool,
* SIP message and placing it into the taskpool. Once in the taskpool,
* the distributor will perform endpoint lookups and authentication, and
* then distribute the message up the stack to any further modules.
*
@@ -278,9 +278,9 @@ void ast_res_pjsip_cleanup_message_filter(void);
/*!
* \internal
* \brief Get threadpool options
* \brief Get taskpool options
*/
void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options);
void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options);
/*!
* \internal

View File

@@ -3009,30 +3009,75 @@
</since>
<synopsis>Use the short forms of common SIP header names.</synopsis>
</configOption>
<configOption name="threadpool_initial_size" default="0">
<configOption name="taskpool_minimum_size" default="4">
<since>
<version>12.0.0</version>
<version>23.1.0</version>
<version>22.7.0</version>
<version>20.17.0</version>
</since>
<synopsis>Initial number of threads in the res_pjsip threadpool.</synopsis>
<synopsis>Minimum number of taskprocessors in the res_pjsip taskpool.</synopsis>
</configOption>
<configOption name="threadpool_auto_increment" default="5">
<configOption name="taskpool_initial_size" default="4">
<since>
<version>23.1.0</version>
<version>22.7.0</version>
<version>20.17.0</version>
</since>
<synopsis>Initial number of taskprocessors in the res_pjsip taskpool.</synopsis>
</configOption>
<configOption name="taskpool_auto_increment" default="1">
<since>
<version>23.1.0</version>
<version>22.7.0</version>
<version>20.17.0</version>
</since>
<synopsis>The amount by which the number of taskprocessors is incremented when necessary.</synopsis>
</configOption>
<configOption name="taskpool_idle_timeout" default="60">
<since>
<version>23.1.0</version>
<version>22.7.0</version>
<version>20.17.0</version>
</since>
<synopsis>Number of seconds before an idle taskprocessor should be disposed of.</synopsis>
</configOption>
<configOption name="taskpool_max_size" default="50">
<since>
<version>23.1.0</version>
<version>22.7.0</version>
<version>20.17.0</version>
</since>
<synopsis>Maximum number of taskprocessors in the res_pjsip taskpool.
A value of 0 indicates no maximum.</synopsis>
</configOption>
<configOption name="threadpool_initial_size" default="4">
<since>
<version>12.0.0</version>
</since>
<synopsis>The amount by which the number of threads is incremented when necessary.</synopsis>
<synopsis>Initial number of threads in the res_pjsip taskpool.
Deprecated in favor of taskpool_initiali_size.</synopsis>
</configOption>
<configOption name="threadpool_auto_increment" default="1">
<since>
<version>12.0.0</version>
</since>
<synopsis>The amount by which the number of threads is incremented when necessary.
Deprecated in favor of taskpool_auto_increment.</synopsis>
</configOption>
<configOption name="threadpool_idle_timeout" default="60">
<since>
<version>12.0.0</version>
</since>
<synopsis>Number of seconds before an idle thread should be disposed of.</synopsis>
<synopsis>Number of seconds before an idle taskprocessor should be disposed of.
Deprecated in favor of taskpool_idle_timeout.</synopsis>
</configOption>
<configOption name="threadpool_max_size" default="0">
<configOption name="threadpool_max_size" default="50">
<since>
<version>13.7.0</version>
</since>
<synopsis>Maximum number of threads in the res_pjsip threadpool.
A value of 0 indicates no maximum.</synopsis>
<synopsis>Maximum number of taskprocessors in the res_pjsip taskpool.
A value of 0 indicates no maximum.
Deprecated in favor of taskpool_max_size.</synopsis>
</configOption>
<configOption name="disable_tcp_switch" default="yes">
<since>

View File

@@ -24,7 +24,7 @@
#include "asterisk/acl.h"
#include "include/res_pjsip_private.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskpool.h"
#include "asterisk/res_pjsip_cli.h"
static int distribute(void *data);
@@ -75,7 +75,7 @@ static pj_status_t record_serializer(pjsip_tx_data *tdata)
{
struct ast_taskprocessor *serializer;
serializer = ast_threadpool_serializer_get_current();
serializer = ast_taskpool_serializer_get_current();
if (serializer) {
const char *name;

View File

@@ -33,7 +33,7 @@
#include "asterisk/statsd.h"
#include "include/res_pjsip_private.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/serializer_shutdown_group.h"
/*
* This implementation for OPTIONS support is based around the idea

View File

@@ -31,7 +31,7 @@
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskpool.h"
#ifdef HAVE_PJSIP_EXTERNAL_RESOLVER
@@ -611,7 +611,7 @@ static void sip_resolve(pjsip_resolver_t *resolver, pj_pool_t *pool, const pjsip
return;
}
resolve->serializer = ao2_bump(ast_threadpool_serializer_get_current());
resolve->serializer = ao2_bump(ast_taskpool_serializer_get_current());
ast_debug(2, "[%p] Starting initial resolution using parallel queries for target '%s'\n", resolve, host);
ast_dns_query_set_resolve_async(resolve->queries, sip_resolve_callback, resolve);