mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-31 10:47:18 +00:00 
			
		
		
		
	pjsip: Move from threadpool to taskpool
This change moves the PJSIP module from the threadpool API to the taskpool API. PJSIP-specific implementations for task usage have been removed and replaced with calls to the optimized taskpool implementations instead. The need for a pool of serializers has also been removed as taskpool inherently provides this. The default settings have also been changed to be more realistic for common usage. UpgradeNote: The threadpool_* options in pjsip.conf have now been deprecated though they continue to be read and used. They have been replaced with taskpool options that give greater control over the underlying taskpool used for PJSIP. An alembic upgrade script has been added to add these options to realtime as well.
This commit is contained in:
		
							
								
								
									
										121
									
								
								res/res_pjsip.c
									
									
									
									
									
								
							
							
						
						
									
										121
									
								
								res/res_pjsip.c
									
									
									
									
									
								
							| @@ -37,7 +37,7 @@ | ||||
| #include "asterisk/astobj2.h" | ||||
| #include "asterisk/module.h" | ||||
| #include "asterisk/serializer.h" | ||||
| #include "asterisk/threadpool.h" | ||||
| #include "asterisk/taskpool.h" | ||||
| #include "asterisk/taskprocessor.h" | ||||
| #include "asterisk/uuid.h" | ||||
| #include "asterisk/sorcery.h" | ||||
| @@ -65,15 +65,9 @@ | ||||
|  | ||||
| #define MOD_DATA_CONTACT "contact" | ||||
|  | ||||
| /*! Number of serializers in pool if one not supplied. */ | ||||
| #define SERIALIZER_POOL_SIZE		8 | ||||
|  | ||||
| /*! Pool of serializers to use if not supplied. */ | ||||
| static struct ast_serializer_pool *sip_serializer_pool; | ||||
|  | ||||
| static pjsip_endpoint *ast_pjsip_endpoint; | ||||
|  | ||||
| static struct ast_threadpool *sip_threadpool; | ||||
| static struct ast_taskpool *sip_taskpool; | ||||
|  | ||||
| /*! Local host address for IPv4 */ | ||||
| static pj_sockaddr host_ip_ipv4; | ||||
| @@ -2088,7 +2082,7 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text) | ||||
|  | ||||
| struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group) | ||||
| { | ||||
| 	return ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group); | ||||
| 	return ast_taskpool_serializer_group(name, sip_taskpool, shutdown_group); | ||||
| } | ||||
|  | ||||
| struct ast_taskprocessor *ast_sip_create_serializer(const char *name) | ||||
| @@ -2099,67 +2093,18 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name) | ||||
| int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) | ||||
| { | ||||
| 	if (!serializer) { | ||||
| 		serializer = ast_serializer_pool_get(sip_serializer_pool); | ||||
| 		return ast_taskpool_push(sip_taskpool, sip_task, task_data); | ||||
| 	} | ||||
|  | ||||
| 	return ast_taskprocessor_push(serializer, sip_task, task_data); | ||||
| } | ||||
|  | ||||
| struct sync_task_data { | ||||
| 	ast_mutex_t lock; | ||||
| 	ast_cond_t cond; | ||||
| 	int complete; | ||||
| 	int fail; | ||||
| 	int (*task)(void *); | ||||
| 	void *task_data; | ||||
| }; | ||||
|  | ||||
| static int sync_task(void *data) | ||||
| { | ||||
| 	struct sync_task_data *std = data; | ||||
| 	int ret; | ||||
|  | ||||
| 	std->fail = std->task(std->task_data); | ||||
|  | ||||
| 	/* | ||||
| 	 * Once we unlock std->lock after signaling, we cannot access | ||||
| 	 * std again.  The thread waiting within ast_sip_push_task_wait() | ||||
| 	 * is free to continue and release its local variable (std). | ||||
| 	 */ | ||||
| 	ast_mutex_lock(&std->lock); | ||||
| 	std->complete = 1; | ||||
| 	ast_cond_signal(&std->cond); | ||||
| 	ret = std->fail; | ||||
| 	ast_mutex_unlock(&std->lock); | ||||
| 	return ret; | ||||
| } | ||||
|  | ||||
| static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) | ||||
| { | ||||
| 	/* This method is an onion */ | ||||
| 	struct sync_task_data std; | ||||
|  | ||||
| 	memset(&std, 0, sizeof(std)); | ||||
| 	ast_mutex_init(&std.lock); | ||||
| 	ast_cond_init(&std.cond, NULL); | ||||
| 	std.task = sip_task; | ||||
| 	std.task_data = task_data; | ||||
|  | ||||
| 	if (ast_sip_push_task(serializer, sync_task, &std)) { | ||||
| 		ast_mutex_destroy(&std.lock); | ||||
| 		ast_cond_destroy(&std.cond); | ||||
| 		return -1; | ||||
| 	if (!serializer) { | ||||
| 		return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data); | ||||
| 	} | ||||
|  | ||||
| 	ast_mutex_lock(&std.lock); | ||||
| 	while (!std.complete) { | ||||
| 		ast_cond_wait(&std.cond, &std.lock); | ||||
| 	} | ||||
| 	ast_mutex_unlock(&std.lock); | ||||
|  | ||||
| 	ast_mutex_destroy(&std.lock); | ||||
| 	ast_cond_destroy(&std.cond); | ||||
| 	return std.fail; | ||||
| 	return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data); | ||||
| } | ||||
|  | ||||
| int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) | ||||
| @@ -2179,23 +2124,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si | ||||
| int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) | ||||
| { | ||||
| 	if (!serializer) { | ||||
| 		/* Caller doesn't care which PJSIP serializer the task executes under. */ | ||||
| 		serializer = ast_serializer_pool_get(sip_serializer_pool); | ||||
| 		if (!serializer) { | ||||
| 			/* No serializer picked to execute the task */ | ||||
| 			return -1; | ||||
| 		} | ||||
| 	} | ||||
| 	if (ast_taskprocessor_is_task(serializer)) { | ||||
| 		/* | ||||
| 		 * We are the requested serializer so we must execute | ||||
| 		 * the task now or deadlock waiting on ourself to | ||||
| 		 * execute it. | ||||
| 		 */ | ||||
| 		return sip_task(task_data); | ||||
| 		return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data); | ||||
| 	} | ||||
|  | ||||
| 	return ast_sip_push_task_wait(serializer, sip_task, task_data); | ||||
| 	return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data); | ||||
| } | ||||
|  | ||||
| void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size) | ||||
| @@ -3454,14 +3386,14 @@ static void remove_request_headers(pjsip_endpoint *endpt) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| long ast_sip_threadpool_queue_size(void) | ||||
| long ast_sip_taskpool_queue_size(void) | ||||
| { | ||||
| 	return ast_threadpool_queue_size(sip_threadpool); | ||||
| 	return ast_taskpool_queue_size(sip_taskpool); | ||||
| } | ||||
|  | ||||
| struct ast_threadpool *ast_sip_threadpool(void) | ||||
| struct ast_taskpool *ast_sip_taskpool(void) | ||||
| { | ||||
| 	return sip_threadpool; | ||||
| 	return sip_taskpool; | ||||
| } | ||||
|  | ||||
| int ast_sip_is_uri_sip_sips(pjsip_uri *uri) | ||||
| @@ -3801,7 +3733,7 @@ static int unload_pjsip(void *data) | ||||
| 	 * These calls need the pjsip endpoint and serializer to clean up. | ||||
| 	 * If they're not set, then there's nothing to clean up anyway. | ||||
| 	 */ | ||||
| 	if (ast_pjsip_endpoint && sip_serializer_pool) { | ||||
| 	if (ast_pjsip_endpoint) { | ||||
| 		ast_res_pjsip_cleanup_options_handling(); | ||||
| 		ast_res_pjsip_cleanup_message_filter(); | ||||
| 		ast_sip_destroy_distributor(); | ||||
| @@ -3921,7 +3853,7 @@ pjsip_media_type pjsip_media_type_text_plain; | ||||
|  | ||||
| static int load_module(void) | ||||
| { | ||||
| 	struct ast_threadpool_options options; | ||||
| 	struct ast_taskpool_options options; | ||||
|  | ||||
| 	/* pjproject and config_system need to be initialized before all else */ | ||||
| 	if (pj_init() != PJ_SUCCESS) { | ||||
| @@ -3958,18 +3890,11 @@ static int load_module(void) | ||||
| 		goto error; | ||||
| 	} | ||||
|  | ||||
| 	/* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */ | ||||
| 	sip_get_threadpool_options(&options); | ||||
| 	/* The serializer needs taskpool and taskpool needs pjproject to be initialized so it's next */ | ||||
| 	sip_get_taskpool_options(&options); | ||||
| 	options.thread_start = sip_thread_start; | ||||
| 	sip_threadpool = ast_threadpool_create("pjsip", NULL, &options); | ||||
| 	if (!sip_threadpool) { | ||||
| 		goto error; | ||||
| 	} | ||||
|  | ||||
| 	sip_serializer_pool = ast_serializer_pool_create( | ||||
| 		"pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1); | ||||
| 	if (!sip_serializer_pool) { | ||||
| 		ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); | ||||
| 	sip_taskpool = ast_taskpool_create("pjsip", &options); | ||||
| 	if (!sip_taskpool) { | ||||
| 		goto error; | ||||
| 	} | ||||
|  | ||||
| @@ -4049,8 +3974,7 @@ error: | ||||
|  | ||||
| 	/* These functions all check for NULLs and are safe to call at any time */ | ||||
| 	ast_sip_destroy_scheduler(); | ||||
| 	ast_serializer_pool_destroy(sip_serializer_pool); | ||||
| 	ast_threadpool_shutdown(sip_threadpool); | ||||
| 	ast_taskpool_shutdown(sip_taskpool); | ||||
|  | ||||
| 	return AST_MODULE_LOAD_DECLINE; | ||||
| } | ||||
| @@ -4076,12 +4000,11 @@ static int unload_module(void) | ||||
| 	ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); | ||||
|  | ||||
| 	/* The thread this is called from cannot call PJSIP/PJLIB functions, | ||||
| 	 * so we have to push the work to the threadpool to handle | ||||
| 	 * so we have to push the work to the taskpool to handle | ||||
| 	 */ | ||||
| 	ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); | ||||
| 	ast_sip_destroy_scheduler(); | ||||
| 	ast_serializer_pool_destroy(sip_serializer_pool); | ||||
| 	ast_threadpool_shutdown(sip_threadpool); | ||||
| 	ast_taskpool_shutdown(sip_taskpool); | ||||
|  | ||||
| 	return 0; | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user