mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-20 03:59:01 +00:00
Genericize the allocation and destruction of taskprocessor listeners.
The goal of this is to take the responsibility away from individual listeners to be sure to properly unref the taskprocessor. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376121 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
@@ -63,10 +63,14 @@ enum ast_tps_options {
|
|||||||
struct ast_taskprocessor_listener;
|
struct ast_taskprocessor_listener;
|
||||||
|
|
||||||
struct ast_taskprocessor_listener_callbacks {
|
struct ast_taskprocessor_listener_callbacks {
|
||||||
|
/*! Allocate the listener's private data */
|
||||||
|
void *(*alloc)(struct ast_taskprocessor_listener *listener);
|
||||||
/*! Indicates a task was pushed to the processor */
|
/*! Indicates a task was pushed to the processor */
|
||||||
void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
|
void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
|
||||||
/*! Indicates the task processor has become empty */
|
/*! Indicates the task processor has become empty */
|
||||||
void (*emptied)(struct ast_taskprocessor_listener *listener);
|
void (*emptied)(struct ast_taskprocessor_listener *listener);
|
||||||
|
/*! Destroy the listener's private data */
|
||||||
|
void (*destroy)(void *private_data);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ast_taskprocessor_listener {
|
struct ast_taskprocessor_listener {
|
||||||
@@ -75,6 +79,9 @@ struct ast_taskprocessor_listener {
|
|||||||
void *private_data;
|
void *private_data;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
|
||||||
|
struct ast_taskprocessor_listener_callbacks *callbacks);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
|
* \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
|
||||||
*
|
*
|
||||||
|
@@ -431,6 +431,7 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
|
|||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
if (obj == NULL) {
|
if (obj == NULL) {
|
||||||
|
ast_backtrace();
|
||||||
ast_assert(0);
|
ast_assert(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@@ -131,17 +131,11 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt,
|
|||||||
ast_cond_signal(&pvt->cond);
|
ast_cond_signal(&pvt->cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void default_listener_destroy(void *obj)
|
static void listener_destroy(void *obj)
|
||||||
{
|
{
|
||||||
struct ast_taskprocessor_listener *listener = obj;
|
struct ast_taskprocessor_listener *listener = obj;
|
||||||
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
|
||||||
|
|
||||||
default_tps_wake_up(pvt, 1);
|
listener->callbacks->destroy(listener->private_data);
|
||||||
pthread_join(pvt->poll_thread, NULL);
|
|
||||||
pvt->poll_thread = AST_PTHREADT_NULL;
|
|
||||||
ast_mutex_destroy(&pvt->lock);
|
|
||||||
ast_cond_destroy(&pvt->cond);
|
|
||||||
ast_free(pvt);
|
|
||||||
|
|
||||||
ao2_ref(listener->tps, -1);
|
ao2_ref(listener->tps, -1);
|
||||||
listener->tps = NULL;
|
listener->tps = NULL;
|
||||||
@@ -173,6 +167,35 @@ static void *tps_processing_function(void *data)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
|
||||||
|
{
|
||||||
|
struct default_taskprocessor_listener_pvt *pvt;
|
||||||
|
|
||||||
|
pvt = ast_calloc(1, sizeof(*pvt));
|
||||||
|
if (!pvt) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
ast_cond_init(&pvt->cond, NULL);
|
||||||
|
ast_mutex_init(&pvt->lock);
|
||||||
|
pvt->poll_thread = AST_PTHREADT_NULL;
|
||||||
|
if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pvt;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void default_listener_destroy(void *obj)
|
||||||
|
{
|
||||||
|
struct default_taskprocessor_listener_pvt *pvt = obj;
|
||||||
|
|
||||||
|
default_tps_wake_up(pvt, 1);
|
||||||
|
pthread_join(pvt->poll_thread, NULL);
|
||||||
|
pvt->poll_thread = AST_PTHREADT_NULL;
|
||||||
|
ast_mutex_destroy(&pvt->lock);
|
||||||
|
ast_cond_destroy(&pvt->cond);
|
||||||
|
ast_free(pvt);
|
||||||
|
}
|
||||||
|
|
||||||
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
|
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
|
||||||
{
|
{
|
||||||
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
||||||
@@ -188,8 +211,10 @@ static void default_emptied(struct ast_taskprocessor_listener *listener)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
|
static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
|
||||||
|
.alloc = default_listener_alloc,
|
||||||
.task_pushed = default_task_pushed,
|
.task_pushed = default_task_pushed,
|
||||||
.emptied = default_emptied,
|
.emptied = default_emptied,
|
||||||
|
.destroy = default_listener_destroy,
|
||||||
};
|
};
|
||||||
|
|
||||||
/*! \internal \brief Clean up resources on Asterisk shutdown */
|
/*! \internal \brief Clean up resources on Asterisk shutdown */
|
||||||
@@ -432,29 +457,22 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
|
|||||||
return tps->name;
|
return tps->name;
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct ast_taskprocessor_listener *default_listener_alloc(void)
|
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
|
||||||
|
struct ast_taskprocessor_listener_callbacks *callbacks)
|
||||||
{
|
{
|
||||||
struct ast_taskprocessor_listener *listener;
|
RAII_VAR(struct ast_taskprocessor_listener *, listener,
|
||||||
struct default_taskprocessor_listener_pvt *pvt;
|
ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
|
||||||
|
|
||||||
listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
|
|
||||||
if (!listener) {
|
if (!listener) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pvt = ast_calloc(1, sizeof(*pvt));
|
listener->callbacks = callbacks;
|
||||||
if (!pvt) {
|
listener->private_data = listener->callbacks->alloc(listener);
|
||||||
ao2_ref(listener, -1);
|
if (!listener->private_data) {
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
listener->callbacks = &default_listener_callbacks;
|
|
||||||
listener->private_data = pvt;
|
|
||||||
ast_cond_init(&pvt->cond, NULL);
|
|
||||||
ast_mutex_init(&pvt->lock);
|
|
||||||
pvt->poll_thread = AST_PTHREADT_NULL;
|
|
||||||
if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
|
|
||||||
ao2_ref(listener, -1);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ao2_ref(listener, +1);
|
||||||
return listener;
|
return listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -480,9 +498,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
/* Create a new taskprocessor. Start by creating a default listener */
|
/* Create a new taskprocessor. Start by creating a default listener */
|
||||||
listener = default_listener_alloc();
|
listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks);
|
||||||
|
if (!listener) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
p = ast_taskprocessor_create_with_listener(name, listener);
|
p = ast_taskprocessor_create_with_listener(name, listener);
|
||||||
|
if (!p) {
|
||||||
|
ao2_ref(listener, -1);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Unref listener here since the taskprocessor has gained a reference to the listener */
|
||||||
ao2_ref(listener, -1);
|
ao2_ref(listener, -1);
|
||||||
return p;
|
return p;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user