mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-11-03 20:38:59 +00:00 
			
		
		
		
	Correct typos of the following word families: mounting jitterbuffer thrashing original manipulating entries actual possibility tasks options positives taskprocessor other dynamic declarative ASTERISK-29714 Change-Id: I6b94659d045eec5d8d020fce2e9b6e2f593dfeb6
		
			
				
	
	
		
			991 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			991 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/*
 | 
						|
 * Asterisk -- An open source telephony toolkit.
 | 
						|
 *
 | 
						|
 * Copyright (C) 2012-2013, Digium, Inc.
 | 
						|
 *
 | 
						|
 * Mark Michelson <mmichelson@digium.com>
 | 
						|
 *
 | 
						|
 * See http://www.asterisk.org for more information about
 | 
						|
 * the Asterisk project. Please do not directly contact
 | 
						|
 * any of the maintainers of this project for assistance;
 | 
						|
 * the project provides a web site, mailing lists and IRC
 | 
						|
 * channels for your use.
 | 
						|
 *
 | 
						|
 * This program is free software, distributed under the terms of
 | 
						|
 * the GNU General Public License Version 2. See the LICENSE file
 | 
						|
 * at the top of the source tree.
 | 
						|
 */
 | 
						|
 | 
						|
/*!
 | 
						|
 * \file
 | 
						|
 * \brief taskprocessor unit tests
 | 
						|
 *
 | 
						|
 * \author Mark Michelson <mmichelson@digium.com>
 | 
						|
 *
 | 
						|
 */
 | 
						|
 | 
						|
/*** MODULEINFO
 | 
						|
	<depend>TEST_FRAMEWORK</depend>
 | 
						|
	<support_level>core</support_level>
 | 
						|
 ***/
 | 
						|
 | 
						|
#include "asterisk.h"
 | 
						|
 | 
						|
#include "asterisk/test.h"
 | 
						|
#include "asterisk/taskprocessor.h"
 | 
						|
#include "asterisk/module.h"
 | 
						|
#include "asterisk/astobj2.h"
 | 
						|
#include "asterisk/serializer.h"
 | 
						|
#include "asterisk/threadpool.h"
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief userdata associated with baseline taskprocessor test
 | 
						|
 */
 | 
						|
struct task_data {
 | 
						|
	/* Condition used to signal to queuing thread that task was executed */
 | 
						|
	ast_cond_t cond;
 | 
						|
	/* Lock protecting the condition */
 | 
						|
	ast_mutex_t lock;
 | 
						|
	/*! Boolean indicating that the task was run */
 | 
						|
	int task_complete;
 | 
						|
	/*! Milliseconds to wait before returning */
 | 
						|
	unsigned long wait_time;
 | 
						|
};
 | 
						|
 | 
						|
static void task_data_dtor(void *obj)
 | 
						|
{
 | 
						|
	struct task_data *task_data = obj;
 | 
						|
 | 
						|
	ast_mutex_destroy(&task_data->lock);
 | 
						|
	ast_cond_destroy(&task_data->cond);
 | 
						|
}
 | 
						|
 | 
						|
/*! \brief Create a task_data object */
 | 
						|
static struct task_data *task_data_create(void)
 | 
						|
{
 | 
						|
	struct task_data *task_data =
 | 
						|
		ao2_alloc(sizeof(*task_data), task_data_dtor);
 | 
						|
 | 
						|
	if (!task_data) {
 | 
						|
		return NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	ast_cond_init(&task_data->cond, NULL);
 | 
						|
	ast_mutex_init(&task_data->lock);
 | 
						|
	task_data->task_complete = 0;
 | 
						|
	task_data->wait_time = 0;
 | 
						|
 | 
						|
	return task_data;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Queued task for baseline test.
 | 
						|
 *
 | 
						|
 * The task simply sets a boolean to indicate the
 | 
						|
 * task has been run and then signals a condition
 | 
						|
 * saying it's complete
 | 
						|
 */
 | 
						|
static int task(void *data)
 | 
						|
{
 | 
						|
	struct task_data *task_data = data;
 | 
						|
 | 
						|
	SCOPED_MUTEX(lock, &task_data->lock);
 | 
						|
	if (task_data->wait_time > 0) {
 | 
						|
		usleep(task_data->wait_time * 1000);
 | 
						|
	}
 | 
						|
	task_data->task_complete = 1;
 | 
						|
	ast_cond_signal(&task_data->cond);
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Wait for a task to execute.
 | 
						|
 */
 | 
						|
static int task_wait(struct task_data *task_data)
 | 
						|
{
 | 
						|
	struct timeval start = ast_tvnow();
 | 
						|
	struct timespec end;
 | 
						|
	SCOPED_MUTEX(lock, &task_data->lock);
 | 
						|
 | 
						|
	end.tv_sec = start.tv_sec + 30;
 | 
						|
	end.tv_nsec = start.tv_usec * 1000;
 | 
						|
 | 
						|
	while (!task_data->task_complete) {
 | 
						|
		int res;
 | 
						|
		res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
 | 
						|
			&end);
 | 
						|
		if (res == ETIMEDOUT) {
 | 
						|
			return -1;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Baseline test for default taskprocessor
 | 
						|
 *
 | 
						|
 * This test ensures that when a task is added to a taskprocessor that
 | 
						|
 * has been allocated with a default listener that the task gets executed
 | 
						|
 * as expected
 | 
						|
 */
 | 
						|
AST_TEST_DEFINE(default_taskprocessor)
 | 
						|
{
 | 
						|
	RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
 | 
						|
	RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
 | 
						|
	int res;
 | 
						|
 | 
						|
	switch (cmd) {
 | 
						|
	case TEST_INIT:
 | 
						|
		info->name = "default_taskprocessor";
 | 
						|
		info->category = "/main/taskprocessor/";
 | 
						|
		info->summary = "Test of default taskprocessor";
 | 
						|
		info->description =
 | 
						|
			"Ensures that a queued task gets executed.";
 | 
						|
		return AST_TEST_NOT_RUN;
 | 
						|
	case TEST_EXECUTE:
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
 | 
						|
 | 
						|
	if (!tps) {
 | 
						|
		ast_test_status_update(test, "Unable to create test taskprocessor\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	task_data = task_data_create();
 | 
						|
	if (!task_data) {
 | 
						|
		ast_test_status_update(test, "Unable to create task_data\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	if (ast_taskprocessor_push(tps, task, task_data)) {
 | 
						|
		ast_test_status_update(test, "Failed to queue task\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	res = task_wait(task_data);
 | 
						|
	if (res != 0) {
 | 
						|
		ast_test_status_update(test, "Queued task did not execute!\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	return AST_TEST_PASS;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Baseline test for subsystem alert
 | 
						|
 */
 | 
						|
AST_TEST_DEFINE(subsystem_alert)
 | 
						|
{
 | 
						|
	RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
 | 
						|
#define TEST_DATA_ARRAY_SIZE 10
 | 
						|
#define LOW_WATER_MARK 3
 | 
						|
#define HIGH_WATER_MARK 6
 | 
						|
	struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
 | 
						|
	int res = 0;
 | 
						|
	int i;
 | 
						|
	long queue_count;
 | 
						|
	unsigned int alert_level;
 | 
						|
	unsigned int subsystem_alert_level;
 | 
						|
 | 
						|
	switch (cmd) {
 | 
						|
	case TEST_INIT:
 | 
						|
		info->name = "subsystem_alert";
 | 
						|
		info->category = "/main/taskprocessor/";
 | 
						|
		info->summary = "Test of subsystem alerts";
 | 
						|
		info->description =
 | 
						|
			"Ensures alerts are generated properly.";
 | 
						|
		return AST_TEST_NOT_RUN;
 | 
						|
	case TEST_EXECUTE:
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
 | 
						|
 | 
						|
	if (!tps) {
 | 
						|
		ast_test_status_update(test, "Unable to create test taskprocessor\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK);
 | 
						|
	ast_taskprocessor_suspend(tps);
 | 
						|
 | 
						|
	for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
 | 
						|
		task_data[i] = task_data_create();
 | 
						|
		if (!task_data[i]) {
 | 
						|
			ast_test_status_update(test, "Unable to create task_data\n");
 | 
						|
			res = -1;
 | 
						|
			goto data_cleanup;
 | 
						|
		}
 | 
						|
		task_data[i]->wait_time = 500;
 | 
						|
 | 
						|
		ast_test_status_update(test, "Pushing task %d\n", i);
 | 
						|
		if (ast_taskprocessor_push(tps, task, task_data[i])) {
 | 
						|
			ast_test_status_update(test, "Failed to queue task\n");
 | 
						|
			res = -1;
 | 
						|
			goto data_cleanup;
 | 
						|
		}
 | 
						|
 | 
						|
		queue_count = ast_taskprocessor_size(tps);
 | 
						|
		alert_level = ast_taskprocessor_alert_get();
 | 
						|
		subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
 | 
						|
 | 
						|
		if (queue_count == HIGH_WATER_MARK) {
 | 
						|
			if (subsystem_alert_level) {
 | 
						|
				ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
 | 
						|
			}
 | 
						|
			if (alert_level) {
 | 
						|
				ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
 | 
						|
			}
 | 
						|
		} else if (queue_count < HIGH_WATER_MARK) {
 | 
						|
			if (subsystem_alert_level > 0) {
 | 
						|
				ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
			if (alert_level > 0) {
 | 
						|
				ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			if (subsystem_alert_level == 0) {
 | 
						|
				ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
			if (alert_level == 0) {
 | 
						|
				ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ast_taskprocessor_unsuspend(tps);
 | 
						|
 | 
						|
	for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
 | 
						|
		ast_test_status_update(test, "Waiting on task %d\n", i);
 | 
						|
		if (task_wait(task_data[i])) {
 | 
						|
			ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
 | 
						|
			res = -1;
 | 
						|
			goto data_cleanup;
 | 
						|
		}
 | 
						|
 | 
						|
		queue_count = ast_taskprocessor_size(tps);
 | 
						|
		alert_level = ast_taskprocessor_alert_get();
 | 
						|
		subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
 | 
						|
 | 
						|
		if (queue_count == LOW_WATER_MARK) {
 | 
						|
			if (!subsystem_alert_level) {
 | 
						|
				ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
 | 
						|
			}
 | 
						|
			if (!alert_level) {
 | 
						|
				ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
 | 
						|
			}
 | 
						|
		} else if (queue_count > LOW_WATER_MARK) {
 | 
						|
			if (subsystem_alert_level == 0) {
 | 
						|
				ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
			if (alert_level == 0) {
 | 
						|
				ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			if (subsystem_alert_level > 0) {
 | 
						|
				ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
			if (alert_level > 0) {
 | 
						|
				ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
 | 
						|
				res = -1;
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
 | 
						|
data_cleanup:
 | 
						|
	for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
 | 
						|
		ao2_cleanup(task_data[i]);
 | 
						|
	}
 | 
						|
 | 
						|
	return res ? AST_TEST_FAIL : AST_TEST_PASS;
 | 
						|
}
 | 
						|
 | 
						|
#define NUM_TASKS 20000
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Relevant data associated with taskprocessor load test
 | 
						|
 */
 | 
						|
static struct load_task_data {
 | 
						|
	/*! Condition used to indicate a task has completed executing */
 | 
						|
	ast_cond_t cond;
 | 
						|
	/*! Lock used to protect the condition */
 | 
						|
	ast_mutex_t lock;
 | 
						|
	/*! Counter of the number of completed tasks */
 | 
						|
	int tasks_completed;
 | 
						|
	/*! Storage for task-specific data */
 | 
						|
	int task_rand[NUM_TASKS];
 | 
						|
} load_task_results;
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief a queued task to be used in the taskprocessor load test
 | 
						|
 *
 | 
						|
 * The task increments the number of tasks executed and puts the passed-in
 | 
						|
 * data into the next slot in the array of random data.
 | 
						|
 */
 | 
						|
static int load_task(void *data)
 | 
						|
{
 | 
						|
	int *randdata = data;
 | 
						|
	SCOPED_MUTEX(lock, &load_task_results.lock);
 | 
						|
	load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata;
 | 
						|
	ast_cond_signal(&load_task_results.cond);
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Load test for taskprocessor with default listener
 | 
						|
 *
 | 
						|
 * This test queues a large number of tasks, each with random data associated.
 | 
						|
 * The test ensures that all of the tasks are run and that the tasks are executed
 | 
						|
 * in the same order that they were queued
 | 
						|
 */
 | 
						|
AST_TEST_DEFINE(default_taskprocessor_load)
 | 
						|
{
 | 
						|
	struct ast_taskprocessor *tps;
 | 
						|
	struct timeval start;
 | 
						|
	struct timespec ts;
 | 
						|
	enum ast_test_result_state res = AST_TEST_PASS;
 | 
						|
	int timedwait_res;
 | 
						|
	int i;
 | 
						|
	int rand_data[NUM_TASKS];
 | 
						|
 | 
						|
	switch (cmd) {
 | 
						|
	case TEST_INIT:
 | 
						|
		info->name = "default_taskprocessor_load";
 | 
						|
		info->category = "/main/taskprocessor/";
 | 
						|
		info->summary = "Load test of default taskprocessor";
 | 
						|
		info->description =
 | 
						|
			"Ensure that a large number of queued tasks are executed in the proper order.";
 | 
						|
		return AST_TEST_NOT_RUN;
 | 
						|
	case TEST_EXECUTE:
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
 | 
						|
 | 
						|
	if (!tps) {
 | 
						|
		ast_test_status_update(test, "Unable to create test taskprocessor\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	start = ast_tvnow();
 | 
						|
 | 
						|
	ts.tv_sec = start.tv_sec + 60;
 | 
						|
	ts.tv_nsec = start.tv_usec * 1000;
 | 
						|
 | 
						|
	ast_cond_init(&load_task_results.cond, NULL);
 | 
						|
	ast_mutex_init(&load_task_results.lock);
 | 
						|
	load_task_results.tasks_completed = 0;
 | 
						|
 | 
						|
	for (i = 0; i < NUM_TASKS; ++i) {
 | 
						|
		rand_data[i] = ast_random();
 | 
						|
		if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
 | 
						|
			ast_test_status_update(test, "Failed to queue task\n");
 | 
						|
			res = AST_TEST_FAIL;
 | 
						|
			goto test_end;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ast_mutex_lock(&load_task_results.lock);
 | 
						|
	while (load_task_results.tasks_completed < NUM_TASKS) {
 | 
						|
		timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts);
 | 
						|
		if (timedwait_res == ETIMEDOUT) {
 | 
						|
			break;
 | 
						|
		}
 | 
						|
	}
 | 
						|
	ast_mutex_unlock(&load_task_results.lock);
 | 
						|
 | 
						|
	if (load_task_results.tasks_completed != NUM_TASKS) {
 | 
						|
		ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
 | 
						|
				NUM_TASKS, load_task_results.tasks_completed);
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_end;
 | 
						|
	}
 | 
						|
 | 
						|
	for (i = 0; i < NUM_TASKS; ++i) {
 | 
						|
		if (rand_data[i] != load_task_results.task_rand[i]) {
 | 
						|
			ast_test_status_update(test, "Queued tasks did not execute in order\n");
 | 
						|
			res = AST_TEST_FAIL;
 | 
						|
			goto test_end;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
test_end:
 | 
						|
	tps = ast_taskprocessor_unreference(tps);
 | 
						|
	ast_mutex_destroy(&load_task_results.lock);
 | 
						|
	ast_cond_destroy(&load_task_results.cond);
 | 
						|
	return res;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Private data for the test taskprocessor listener
 | 
						|
 */
 | 
						|
struct test_listener_pvt {
 | 
						|
	/* Counter of number of tasks pushed to the queue */
 | 
						|
	int num_pushed;
 | 
						|
	/* Counter of number of times the queue was emptied */
 | 
						|
	int num_emptied;
 | 
						|
	/* Counter of number of times that a pushed task occurred on an empty queue */
 | 
						|
	int num_was_empty;
 | 
						|
	/* Boolean indicating whether the shutdown callback was called */
 | 
						|
	int shutdown;
 | 
						|
};
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief test taskprocessor listener's alloc callback
 | 
						|
 */
 | 
						|
static void *test_listener_pvt_alloc(void)
 | 
						|
{
 | 
						|
	struct test_listener_pvt *pvt;
 | 
						|
 | 
						|
	pvt = ast_calloc(1, sizeof(*pvt));
 | 
						|
	return pvt;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief test taskprocessor listener's start callback
 | 
						|
 */
 | 
						|
static int test_start(struct ast_taskprocessor_listener *listener)
 | 
						|
{
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief test taskprocessor listener's task_pushed callback
 | 
						|
 *
 | 
						|
 * Adjusts private data's stats as indicated by the parameters.
 | 
						|
 */
 | 
						|
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 | 
						|
{
 | 
						|
	struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
 | 
						|
	++pvt->num_pushed;
 | 
						|
	if (was_empty) {
 | 
						|
		++pvt->num_was_empty;
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief test taskprocessor listener's emptied callback.
 | 
						|
 */
 | 
						|
static void test_emptied(struct ast_taskprocessor_listener *listener)
 | 
						|
{
 | 
						|
	struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
 | 
						|
	++pvt->num_emptied;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief test taskprocessor listener's shutdown callback.
 | 
						|
 */
 | 
						|
static void test_shutdown(struct ast_taskprocessor_listener *listener)
 | 
						|
{
 | 
						|
	struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
 | 
						|
	pvt->shutdown = 1;
 | 
						|
}
 | 
						|
 | 
						|
static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
 | 
						|
	.start = test_start,
 | 
						|
	.task_pushed = test_task_pushed,
 | 
						|
	.emptied = test_emptied,
 | 
						|
	.shutdown = test_shutdown,
 | 
						|
};
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Queued task for taskprocessor listener test.
 | 
						|
 *
 | 
						|
 * Does nothing.
 | 
						|
 */
 | 
						|
static int listener_test_task(void *ignore)
 | 
						|
{
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief helper to ensure that statistics the listener is keeping are what we expect
 | 
						|
 *
 | 
						|
 * \param test The currently-running test
 | 
						|
 * \param pvt The private data for the taskprocessor listener
 | 
						|
 * \param num_pushed The expected current number of tasks pushed to the processor
 | 
						|
 * \param num_emptied The expected current number of times the taskprocessor has become empty
 | 
						|
 * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
 | 
						|
 * \retval -1 Stats were not as expected
 | 
						|
 * \retval 0 Stats were as expected
 | 
						|
 */
 | 
						|
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
 | 
						|
{
 | 
						|
	if (pvt->num_pushed != num_pushed) {
 | 
						|
		ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
 | 
						|
				num_pushed, pvt->num_pushed);
 | 
						|
		return -1;
 | 
						|
	}
 | 
						|
 | 
						|
	if (pvt->num_emptied != num_emptied) {
 | 
						|
		ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
 | 
						|
				num_emptied, pvt->num_emptied);
 | 
						|
		return -1;
 | 
						|
	}
 | 
						|
 | 
						|
	if (pvt->num_was_empty != num_was_empty) {
 | 
						|
		ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
 | 
						|
				num_was_empty, pvt->num_emptied);
 | 
						|
		return -1;
 | 
						|
	}
 | 
						|
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Test for a taskprocessor with custom listener.
 | 
						|
 *
 | 
						|
 * This test pushes tasks to a taskprocessor with a custom listener, executes the tasks,
 | 
						|
 * and destroys the taskprocessor.
 | 
						|
 *
 | 
						|
 * The test ensures that the listener's callbacks are called when expected and that the data
 | 
						|
 * being passed in is accurate.
 | 
						|
 */
 | 
						|
AST_TEST_DEFINE(taskprocessor_listener)
 | 
						|
{
 | 
						|
	struct ast_taskprocessor *tps = NULL;
 | 
						|
	struct ast_taskprocessor_listener *listener = NULL;
 | 
						|
	struct test_listener_pvt *pvt = NULL;
 | 
						|
	enum ast_test_result_state res = AST_TEST_PASS;
 | 
						|
 | 
						|
	switch (cmd) {
 | 
						|
	case TEST_INIT:
 | 
						|
		info->name = "taskprocessor_listener";
 | 
						|
		info->category = "/main/taskprocessor/";
 | 
						|
		info->summary = "Test of taskprocessor listeners";
 | 
						|
		info->description =
 | 
						|
			"Ensures that listener callbacks are called when expected.";
 | 
						|
		return AST_TEST_NOT_RUN;
 | 
						|
	case TEST_EXECUTE:
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	pvt = test_listener_pvt_alloc();
 | 
						|
	if (!pvt) {
 | 
						|
		ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
 | 
						|
	if (!listener) {
 | 
						|
		ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	tps = ast_taskprocessor_create_with_listener("test_listener", listener);
 | 
						|
	if (!tps) {
 | 
						|
		ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
 | 
						|
		ast_test_status_update(test, "Failed to queue task\n");
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	if (check_stats(test, pvt, 1, 0, 1) < 0) {
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
 | 
						|
		ast_test_status_update(test, "Failed to queue task\n");
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	if (check_stats(test, pvt, 2, 0, 1) < 0) {
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	ast_taskprocessor_execute(tps);
 | 
						|
 | 
						|
	if (check_stats(test, pvt, 2, 0, 1) < 0) {
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	ast_taskprocessor_execute(tps);
 | 
						|
 | 
						|
	if (check_stats(test, pvt, 2, 1, 1) < 0) {
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	tps = ast_taskprocessor_unreference(tps);
 | 
						|
 | 
						|
	if (!pvt->shutdown) {
 | 
						|
		res = AST_TEST_FAIL;
 | 
						|
		goto test_exit;
 | 
						|
	}
 | 
						|
 | 
						|
test_exit:
 | 
						|
	ao2_cleanup(listener);
 | 
						|
	/* This is safe even if tps is NULL */
 | 
						|
	ast_taskprocessor_unreference(tps);
 | 
						|
	ast_free(pvt);
 | 
						|
	return res;
 | 
						|
}
 | 
						|
 | 
						|
struct shutdown_data {
 | 
						|
	ast_cond_t in;
 | 
						|
	ast_cond_t out;
 | 
						|
	ast_mutex_t lock;
 | 
						|
	int task_complete;
 | 
						|
	int task_started;
 | 
						|
	int task_stop_waiting;
 | 
						|
};
 | 
						|
 | 
						|
static void shutdown_data_dtor(void *data)
 | 
						|
{
 | 
						|
	struct shutdown_data *shutdown_data = data;
 | 
						|
	ast_mutex_destroy(&shutdown_data->lock);
 | 
						|
	ast_cond_destroy(&shutdown_data->in);
 | 
						|
	ast_cond_destroy(&shutdown_data->out);
 | 
						|
}
 | 
						|
 | 
						|
static struct shutdown_data *shutdown_data_create(int dont_wait)
 | 
						|
{
 | 
						|
	RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
 | 
						|
 | 
						|
	shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
 | 
						|
	if (!shutdown_data) {
 | 
						|
		return NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	ast_mutex_init(&shutdown_data->lock);
 | 
						|
	ast_cond_init(&shutdown_data->in, NULL);
 | 
						|
	ast_cond_init(&shutdown_data->out, NULL);
 | 
						|
	shutdown_data->task_stop_waiting = dont_wait;
 | 
						|
	ao2_ref(shutdown_data, +1);
 | 
						|
	return shutdown_data;
 | 
						|
}
 | 
						|
 | 
						|
static int shutdown_task_exec(void *data)
 | 
						|
{
 | 
						|
	struct shutdown_data *shutdown_data = data;
 | 
						|
	SCOPED_MUTEX(lock, &shutdown_data->lock);
 | 
						|
	shutdown_data->task_started = 1;
 | 
						|
	ast_cond_signal(&shutdown_data->out);
 | 
						|
	while (!shutdown_data->task_stop_waiting) {
 | 
						|
		ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
 | 
						|
	}
 | 
						|
	shutdown_data->task_complete = 1;
 | 
						|
	ast_cond_signal(&shutdown_data->out);
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
 | 
						|
{
 | 
						|
	struct timeval start = ast_tvnow();
 | 
						|
	struct timespec end = {
 | 
						|
		.tv_sec = start.tv_sec + 5,
 | 
						|
		.tv_nsec = start.tv_usec * 1000
 | 
						|
	};
 | 
						|
	SCOPED_MUTEX(lock, &shutdown_data->lock);
 | 
						|
 | 
						|
	while (!shutdown_data->task_complete) {
 | 
						|
		if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
 | 
						|
			break;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return shutdown_data->task_complete;
 | 
						|
}
 | 
						|
 | 
						|
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
 | 
						|
{
 | 
						|
	SCOPED_MUTEX(lock, &shutdown_data->lock);
 | 
						|
	return shutdown_data->task_complete;
 | 
						|
}
 | 
						|
 | 
						|
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
 | 
						|
{
 | 
						|
	struct timeval start = ast_tvnow();
 | 
						|
	struct timespec end = {
 | 
						|
		.tv_sec = start.tv_sec + 5,
 | 
						|
		.tv_nsec = start.tv_usec * 1000
 | 
						|
	};
 | 
						|
	SCOPED_MUTEX(lock, &shutdown_data->lock);
 | 
						|
 | 
						|
	while (!shutdown_data->task_started) {
 | 
						|
		if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
 | 
						|
			break;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return shutdown_data->task_started;
 | 
						|
}
 | 
						|
 | 
						|
static void shutdown_poke(struct shutdown_data *shutdown_data)
 | 
						|
{
 | 
						|
	SCOPED_MUTEX(lock, &shutdown_data->lock);
 | 
						|
	shutdown_data->task_stop_waiting = 1;
 | 
						|
	ast_cond_signal(&shutdown_data->in);
 | 
						|
}
 | 
						|
 | 
						|
static void *tps_shutdown_thread(void *data)
 | 
						|
{
 | 
						|
	struct ast_taskprocessor *tps = data;
 | 
						|
	ast_taskprocessor_unreference(tps);
 | 
						|
	return NULL;
 | 
						|
}
 | 
						|
 | 
						|
AST_TEST_DEFINE(taskprocessor_shutdown)
 | 
						|
{
 | 
						|
	RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
 | 
						|
	RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
 | 
						|
	RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
 | 
						|
	int push_res;
 | 
						|
	int wait_res;
 | 
						|
	int pthread_res;
 | 
						|
	pthread_t shutdown_thread;
 | 
						|
 | 
						|
	switch (cmd) {
 | 
						|
	case TEST_INIT:
 | 
						|
		info->name = "taskprocessor_shutdown";
 | 
						|
		info->category = "/main/taskprocessor/";
 | 
						|
		info->summary = "Test of taskprocessor shutdown sequence";
 | 
						|
		info->description =
 | 
						|
			"Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
 | 
						|
		return AST_TEST_NOT_RUN;
 | 
						|
	case TEST_EXECUTE:
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
 | 
						|
	task1 = shutdown_data_create(0); /* task1 waits to be poked */
 | 
						|
	task2 = shutdown_data_create(1); /* task2 waits for nothing */
 | 
						|
 | 
						|
	if (!tps || !task1 || !task2) {
 | 
						|
		ast_test_status_update(test, "Allocation error\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
 | 
						|
	if (push_res != 0) {
 | 
						|
		ast_test_status_update(test, "Could not push task1\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
 | 
						|
	if (push_res != 0) {
 | 
						|
		ast_test_status_update(test, "Could not push task2\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	wait_res = shutdown_waitfor_start(task1);
 | 
						|
	if (!wait_res) {
 | 
						|
		ast_test_status_update(test, "Task1 didn't start\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
 | 
						|
	if (pthread_res != 0) {
 | 
						|
		ast_test_status_update(test, "Failed to create shutdown thread\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
	tps = NULL;
 | 
						|
 | 
						|
	/* Wakeup task1; it should complete */
 | 
						|
	shutdown_poke(task1);
 | 
						|
	wait_res = shutdown_waitfor_completion(task1);
 | 
						|
	if (!wait_res) {
 | 
						|
		ast_test_status_update(test, "Task1 didn't complete\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Wait for shutdown to complete */
 | 
						|
	pthread_join(shutdown_thread, NULL);
 | 
						|
 | 
						|
	/* Should have also completed task2 */
 | 
						|
	wait_res = shutdown_has_completed(task2);
 | 
						|
	if (!wait_res) {
 | 
						|
		ast_test_status_update(test, "Task2 didn't finish\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	return AST_TEST_PASS;
 | 
						|
}
 | 
						|
 | 
						|
static int local_task_exe(struct ast_taskprocessor_local *local)
 | 
						|
{
 | 
						|
	int *local_data = local->local_data;
 | 
						|
	struct task_data *task_data = local->data;
 | 
						|
 | 
						|
	*local_data = 1;
 | 
						|
	task(task_data);
 | 
						|
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
AST_TEST_DEFINE(taskprocessor_push_local)
 | 
						|
{
 | 
						|
	RAII_VAR(struct ast_taskprocessor *, tps, NULL,
 | 
						|
		ast_taskprocessor_unreference);
 | 
						|
	RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
 | 
						|
	int local_data;
 | 
						|
	int res;
 | 
						|
 | 
						|
	switch (cmd) {
 | 
						|
	case TEST_INIT:
 | 
						|
		info->name = __func__;
 | 
						|
		info->category = "/main/taskprocessor/";
 | 
						|
		info->summary = "Test of pushing local data";
 | 
						|
		info->description =
 | 
						|
			"Ensures that local data is passed along.";
 | 
						|
		return AST_TEST_NOT_RUN;
 | 
						|
	case TEST_EXECUTE:
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
 | 
						|
	tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
 | 
						|
	if (!tps) {
 | 
						|
		ast_test_status_update(test, "Unable to create test taskprocessor\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
 | 
						|
	task_data = task_data_create();
 | 
						|
	if (!task_data) {
 | 
						|
		ast_test_status_update(test, "Unable to create task_data\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	local_data = 0;
 | 
						|
	ast_taskprocessor_set_local(tps, &local_data);
 | 
						|
 | 
						|
	if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) {
 | 
						|
		ast_test_status_update(test, "Failed to queue task\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	res = task_wait(task_data);
 | 
						|
	if (res != 0) {
 | 
						|
		ast_test_status_update(test, "Queued task did not execute!\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	if (local_data != 1) {
 | 
						|
		ast_test_status_update(test,
 | 
						|
			"Queued task did not set local_data!\n");
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	return AST_TEST_PASS;
 | 
						|
}
 | 
						|
 | 
						|
/*!
 | 
						|
 * \brief Baseline test for a serializer pool
 | 
						|
 *
 | 
						|
 * This test ensures that when a task is added to a taskprocessor that
 | 
						|
 * has been allocated with a default listener that the task gets executed
 | 
						|
 * as expected
 | 
						|
 */
 | 
						|
AST_TEST_DEFINE(serializer_pool)
 | 
						|
{
 | 
						|
	RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);
 | 
						|
	RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
 | 
						|
	RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
 | 
						|
	struct ast_threadpool_options options = {
 | 
						|
		.version = AST_THREADPOOL_OPTIONS_VERSION,
 | 
						|
		.idle_timeout = 0,
 | 
						|
		.auto_increment = 0,
 | 
						|
		.initial_size = 1,
 | 
						|
		.max_size = 0,
 | 
						|
	};
 | 
						|
	/* struct ast_taskprocessor *tps; */
 | 
						|
 | 
						|
	switch (cmd) {
 | 
						|
	case TEST_INIT:
 | 
						|
		info->name = "serializer_pool";
 | 
						|
		info->category = "/main/taskprocessor/";
 | 
						|
		info->summary = "Test using a serializer pool";
 | 
						|
		info->description =
 | 
						|
			"Ensures that a queued task gets executed.";
 | 
						|
		return AST_TEST_NOT_RUN;
 | 
						|
	case TEST_EXECUTE:
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
 | 
						|
	ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
 | 
						|
						  "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
 | 
						|
	ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
 | 
						|
	ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
 | 
						|
	ast_test_validate(test, task_data = task_data_create());
 | 
						|
 | 
						|
	task_data->wait_time = 4000; /* task takes 4 seconds */
 | 
						|
	ast_test_validate(test, !ast_taskprocessor_push(
 | 
						|
						  ast_serializer_pool_get(serializer_pool), task, task_data));
 | 
						|
 | 
						|
	if (!ast_serializer_pool_destroy(serializer_pool)) {
 | 
						|
		ast_test_status_update(test, "Unexpected pool destruction!\n");
 | 
						|
		/*
 | 
						|
		 * The pool should have timed out, so if it destruction reports success
 | 
						|
		 * we need to fail.
 | 
						|
		 */
 | 
						|
		serializer_pool = NULL;
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	ast_test_validate(test, !task_wait(task_data));
 | 
						|
 | 
						|
	/* The first attempt should have failed. Second try should destroy successfully */
 | 
						|
	if (ast_serializer_pool_destroy(serializer_pool)) {
 | 
						|
		ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
 | 
						|
		/*
 | 
						|
		 * If this fails we'll try again on return to hopefully avoid a memory leak.
 | 
						|
		 * If it again times out a third time, well not much we can do.
 | 
						|
		 */
 | 
						|
		return AST_TEST_FAIL;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Test passed, so set pool to NULL to avoid "re-running" destroy */
 | 
						|
	serializer_pool = NULL;
 | 
						|
 | 
						|
	return AST_TEST_PASS;
 | 
						|
}
 | 
						|
 | 
						|
static int unload_module(void)
 | 
						|
{
 | 
						|
	ast_test_unregister(default_taskprocessor);
 | 
						|
	ast_test_unregister(default_taskprocessor_load);
 | 
						|
	ast_test_unregister(subsystem_alert);
 | 
						|
	ast_test_unregister(taskprocessor_listener);
 | 
						|
	ast_test_unregister(taskprocessor_shutdown);
 | 
						|
	ast_test_unregister(taskprocessor_push_local);
 | 
						|
	ast_test_unregister(serializer_pool);
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
static int load_module(void)
 | 
						|
{
 | 
						|
	ast_test_register(default_taskprocessor);
 | 
						|
	ast_test_register(default_taskprocessor_load);
 | 
						|
	ast_test_register(subsystem_alert);
 | 
						|
	ast_test_register(taskprocessor_listener);
 | 
						|
	ast_test_register(taskprocessor_shutdown);
 | 
						|
	ast_test_register(taskprocessor_push_local);
 | 
						|
	ast_test_register(serializer_pool);
 | 
						|
	return AST_MODULE_LOAD_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");
 |