Improve shutdown procedure.

This helps tests to pass more often than before.
They are far less likely to queue extra processes
into the control taskprocessor since they are prevented
once the threadpool begins to shut down.



git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377578 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Mark Michelson
2012-12-10 05:25:38 +00:00
parent 03d617040a
commit 5dd22df050
2 changed files with 36 additions and 4 deletions

View File

@@ -214,7 +214,12 @@ static int queued_active_thread_idle(void *data)
static void threadpool_active_thread_idle(struct ast_threadpool *pool,
struct worker_thread *worker)
{
struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
struct thread_worker_pair *pair;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
@@ -249,7 +254,12 @@ static int queued_zombie_thread_dead(void *data)
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
struct worker_thread *worker)
{
struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
struct thread_worker_pair *pair;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
@@ -268,9 +278,12 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
*/
static int threadpool_execute(struct ast_threadpool *pool)
{
ao2_lock(pool);
if (!pool->shutting_down) {
ao2_unlock(pool);
return ast_taskprocessor_execute(pool->tps);
}
ao2_unlock(pool);
return 0;
}
@@ -422,8 +435,13 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen
int was_empty)
{
struct ast_threadpool *pool = listener->private_data;
struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
struct task_pushed_data *tpd;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
tpd = task_pushed_data_alloc(pool, was_empty);
if (!tpd) {
return;
}
@@ -456,6 +474,11 @@ static int handle_emptied(void *data)
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{
struct ast_threadpool *pool = listener->private_data;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
}
@@ -690,6 +713,10 @@ static int queued_set_size(void *data)
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
{
struct set_size_data *ssd;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
return;
}
ssd = set_size_data_alloc(pool, size);
if (!ssd) {
@@ -750,6 +777,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
SCOPED_AO2LOCK(lock, pool);
if (!pool->shutting_down) {
return ast_taskprocessor_push(pool->tps, task, data);
}
@@ -761,7 +789,9 @@ void ast_threadpool_shutdown(struct ast_threadpool *pool)
/* Shut down the taskprocessors and everything else just
* takes care of itself via the taskprocessor callbacks
*/
ast_atomic_fetchadd_int(&pool->shutting_down, +1);
ao2_lock(pool);
pool->shutting_down = 1;
ao2_unlock(pool);
ast_taskprocessor_unreference(pool->control_tps);
ast_taskprocessor_unreference(pool->tps);
}
@@ -834,6 +864,7 @@ static void worker_shutdown(struct worker_thread *worker)
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
ast_log(LOG_NOTICE, "Worker dying\n");
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);

View File

@@ -69,6 +69,7 @@ static void test_state_changed(struct ast_threadpool *pool,
SCOPED_MUTEX(lock, &tld->lock);
tld->num_active = active_threads;
tld->num_idle = idle_threads;
ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
ast_cond_signal(&tld->cond);
}