Merge "pjsip_scheduler.c: Fix some corner cases."

This commit is contained in:
Jenkins2
2018-04-16 06:49:14 -05:00
committed by Gerrit Code Review
2 changed files with 121 additions and 46 deletions

View File

@@ -1407,7 +1407,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
* the next item on the SIP socket(s) can be serviced. On incoming messages, * the next item on the SIP socket(s) can be serviced. On incoming messages,
* Asterisk automatically will push the request to a servant thread. When your * Asterisk automatically will push the request to a servant thread. When your
* module callback is called, processing will already be in a servant. However, * module callback is called, processing will already be in a servant. However,
* for other PSJIP events, such as transaction state changes due to timer * for other PJSIP events, such as transaction state changes due to timer
* expirations, your module will be called into from a PJSIP thread. If you * expirations, your module will be called into from a PJSIP thread. If you
* are called into from a PJSIP thread, then you should push whatever processing * are called into from a PJSIP thread, then you should push whatever processing
* is needed to a servant as soon as possible. You can discern if you are currently * is needed to a servant as soon as possible. You can discern if you are currently
@@ -1588,13 +1588,13 @@ enum ast_sip_scheduler_task_flags {
/*! /*!
* Run at a fixed interval. * Run at a fixed interval.
* Stop scheduling if the callback returns 0. * Stop scheduling if the callback returns <= 0.
* Any other value is ignored. * Any other value is ignored.
*/ */
AST_SIP_SCHED_TASK_FIXED = (0 << 0), AST_SIP_SCHED_TASK_FIXED = (0 << 0),
/*! /*!
* Run at a variable interval. * Run at a variable interval.
* Stop scheduling if the callback returns 0. * Stop scheduling if the callback returns <= 0.
* Any other return value is used as the new interval. * Any other return value is used as the new interval.
*/ */
AST_SIP_SCHED_TASK_VARIABLE = (1 << 0), AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),

View File

@@ -28,6 +28,7 @@
#include "asterisk/res_pjsip.h" #include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h" #include "include/res_pjsip_private.h"
#include "asterisk/res_pjsip_cli.h" #include "asterisk/res_pjsip_cli.h"
#include "asterisk/taskprocessor.h"
#define TASK_BUCKETS 53 #define TASK_BUCKETS 53
@@ -36,30 +37,30 @@ static struct ao2_container *tasks;
static int task_count; static int task_count;
struct ast_sip_sched_task { struct ast_sip_sched_task {
/*! ast_sip_sched task id */ /*! The serializer to be used (if any) (Holds a ref) */
uint32_t task_id; struct ast_taskprocessor *serializer;
/*! ast_sched scheudler id */
int current_scheduler_id;
/*! task is currently running */
int is_running;
/*! task */
ast_sip_task task;
/*! task data */ /*! task data */
void *task_data; void *task_data;
/*! reschedule interval in milliseconds */ /*! task function */
int interval; ast_sip_task task;
/*! the time the task was queued */ /*! the time the task was originally scheduled/queued */
struct timeval when_queued; struct timeval when_queued;
/*! the last time the task was started */ /*! the last time the task was started */
struct timeval last_start; struct timeval last_start;
/*! the last time the task was ended */ /*! the last time the task was ended */
struct timeval last_end; struct timeval last_end;
/*! When the periodic task is next expected to run */
struct timeval next_periodic;
/*! reschedule interval in milliseconds */
int interval;
/*! ast_sched scheudler id */
int current_scheduler_id;
/*! task is currently running */
int is_running;
/*! times run */ /*! times run */
int run_count; int run_count;
/*! the task reschedule, cleanup and policy flags */ /*! the task reschedule, cleanup and policy flags */
enum ast_sip_scheduler_task_flags flags; enum ast_sip_scheduler_task_flags flags;
/*! the serializer to be used (if any) */
struct ast_taskprocessor *serializer;
/*! A name to be associated with the task */ /*! A name to be associated with the task */
char name[0]; char name[0];
}; };
@@ -76,14 +77,19 @@ static int push_to_serializer(const void *data);
*/ */
static int run_task(void *data) static int run_task(void *data)
{ {
RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup); RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);
int res; int res;
int delay; int delay;
if (!schtd->interval) {
/* Task was cancelled while waiting to be executed by the serializer */
return -1;
}
ao2_lock(schtd); ao2_lock(schtd);
schtd->last_start = ast_tvnow(); schtd->last_start = ast_tvnow();
schtd->is_running = 1; schtd->is_running = 1;
schtd->run_count++; ++schtd->run_count;
ao2_unlock(schtd); ao2_unlock(schtd);
res = schtd->task(schtd->task_data); res = schtd->task(schtd->task_data);
@@ -93,10 +99,10 @@ static int run_task(void *data)
schtd->last_end = ast_tvnow(); schtd->last_end = ast_tvnow();
/* /*
* Don't restart if the task returned 0 or if the interval * Don't restart if the task returned <= 0 or if the interval
* was set to 0 while the task was running * was set to 0 while the task was running
*/ */
if (!res || !schtd->interval) { if (res <= 0 || !schtd->interval) {
schtd->interval = 0; schtd->interval = 0;
ao2_unlock(schtd); ao2_unlock(schtd);
ao2_unlink(tasks, schtd); ao2_unlink(tasks, schtd);
@@ -110,13 +116,22 @@ static int run_task(void *data)
if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) { if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
delay = schtd->interval; delay = schtd->interval;
} else { } else {
delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval); int64_t diff;
/* Determine next periodic interval we need to expire. */
do {
schtd->next_periodic = ast_tvadd(schtd->next_periodic,
ast_samp2tv(schtd->interval, 1000));
diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);
} while (diff <= 0);
delay = diff;
} }
schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd); schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);
if (schtd->current_scheduler_id < 0) { if (schtd->current_scheduler_id < 0) {
schtd->interval = 0; schtd->interval = 0;
ao2_unlock(schtd); ao2_unlock(schtd);
ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);
ao2_unlink(tasks, schtd); ao2_unlink(tasks, schtd);
return -1; return -1;
} }
@@ -133,9 +148,29 @@ static int run_task(void *data)
static int push_to_serializer(const void *data) static int push_to_serializer(const void *data)
{ {
struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data; struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
int sched_id;
ao2_lock(schtd);
sched_id = schtd->current_scheduler_id;
schtd->current_scheduler_id = -1;
ao2_unlock(schtd);
if (sched_id < 0) {
/* Task was cancelled while waiting on the lock */
return 0;
}
ao2_t_ref(schtd, +1, "Give ref to run_task()");
if (ast_sip_push_task(schtd->serializer, run_task, schtd)) { if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
ao2_ref(schtd, -1); /*
* Oh my. Have to cancel the scheduled item because we
* unexpectedly cannot run it anymore.
*/
ao2_unlink(tasks, schtd);
ao2_lock(schtd);
schtd->interval = 0;
ao2_unlock(schtd);
ao2_t_ref(schtd, -1, "Failed so release run_task() ref");
} }
return 0; return 0;
@@ -144,20 +179,22 @@ static int push_to_serializer(const void *data)
int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd) int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
{ {
int res; int res;
int sched_id;
if (!ao2_ref_and_lock(schtd)) { /*
return -1; * Prevent any tasks in the serializer queue from
} * running and restarting the scheduled item on us
* first.
if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) { */
ao2_unlock_and_unref(schtd); ao2_lock(schtd);
return 0;
}
schtd->interval = 0; schtd->interval = 0;
ao2_unlock_and_unref(schtd);
sched_id = schtd->current_scheduler_id;
schtd->current_scheduler_id = -1;
ao2_unlock(schtd);
res = ast_sched_del(scheduler_context, sched_id);
ao2_unlink(tasks, schtd); ao2_unlink(tasks, schtd);
res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
return res; return res;
} }
@@ -306,7 +343,7 @@ int ast_sip_sched_is_task_running_by_name(const char *name)
return is_running; return is_running;
} }
static void schtd_destructor(void *data) static void schtd_dtor(void *data)
{ {
struct ast_sip_sched_task *schtd = data; struct ast_sip_sched_task *schtd = data;
@@ -316,6 +353,7 @@ static void schtd_destructor(void *data)
} else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) { } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
ast_free(schtd->task_data); ast_free(schtd->task_data);
} }
ast_taskprocessor_unreference(schtd->serializer);
} }
struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer, struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
@@ -326,38 +364,60 @@ struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *seria
struct ast_sip_sched_task *schtd; struct ast_sip_sched_task *schtd;
int res; int res;
if (interval < 0) { if (interval <= 0) {
return NULL; return NULL;
} }
schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor); schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),
schtd_dtor);
if (!schtd) { if (!schtd) {
return NULL; return NULL;
} }
schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1); schtd->serializer = ao2_bump(serializer);
schtd->serializer = serializer; schtd->task_data = task_data;
schtd->task = sip_task; schtd->task = sip_task;
schtd->interval = interval;
schtd->flags = flags;
if (!ast_strlen_zero(name)) { if (!ast_strlen_zero(name)) {
strcpy(schtd->name, name); /* Safe */ strcpy(schtd->name, name); /* Safe */
} else { } else {
sprintf(schtd->name, "task_%08x", schtd->task_id); uint32_t task_id;
task_id = ast_atomic_fetchadd_int(&task_count, 1);
sprintf(schtd->name, "task_%08x", task_id);
} }
schtd->task_data = task_data;
schtd->flags = flags;
schtd->interval = interval;
schtd->when_queued = ast_tvnow(); schtd->when_queued = ast_tvnow();
if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {
schtd->next_periodic = ast_tvadd(schtd->when_queued,
ast_samp2tv(schtd->interval, 1000));
}
if (flags & AST_SIP_SCHED_TASK_DATA_AO2) { if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
ao2_ref(task_data, +1); ao2_ref(task_data, +1);
} }
/*
* We must put it in the 'tasks' container before scheduling
* the task because we don't want the push_to_serializer()
* sched task to "remove" it on failure before we even put
* it in. If this happens then nothing would remove it from
* the 'tasks' container.
*/
ao2_link(tasks, schtd);
/*
* Lock so we are guaranteed to get the sched id set before
* the push_to_serializer() sched task can clear it.
*/
ao2_lock(schtd);
res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd); res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);
schtd->current_scheduler_id = res;
ao2_unlock(schtd);
if (res < 0) { if (res < 0) {
ao2_unlink(tasks, schtd);
ao2_ref(schtd, -1); ao2_ref(schtd, -1);
return NULL; return NULL;
} else {
schtd->current_scheduler_id = res;
ao2_link(tasks, schtd);
} }
return schtd; return schtd;
@@ -470,7 +530,8 @@ static struct ast_cli_entry cli_commands[] = {
int ast_sip_initialize_scheduler(void) int ast_sip_initialize_scheduler(void)
{ {
if (!(scheduler_context = ast_sched_context_create())) { scheduler_context = ast_sched_context_create();
if (!scheduler_context) {
ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n"); ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
return -1; return -1;
} }
@@ -500,7 +561,21 @@ int ast_sip_destroy_scheduler(void)
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
if (scheduler_context) { if (scheduler_context) {
if (tasks) {
struct ao2_iterator iter;
struct ast_sip_sched_task *schtd;
/* Cancel all scheduled tasks */
iter = ao2_iterator_init(tasks, 0);
while ((schtd = ao2_iterator_next(&iter))) {
ast_sip_sched_task_cancel(schtd);
ao2_ref(schtd, -1);
}
ao2_iterator_destroy(&iter);
}
ast_sched_context_destroy(scheduler_context); ast_sched_context_destroy(scheduler_context);
scheduler_context = NULL;
} }
ao2_cleanup(tasks); ao2_cleanup(tasks);