taskprocessors: Implement high/low water mark alerts.

When taskprocessors get backed up, there is a good chance that we are
being overloaded and need to defer adding new work to the system.

* Implemented a high/low water alert mechanism for modules to check if the
system is being overloaded and take appropriate action.  When a
taskprocessor is created it has default congestion levels set.  A
taskprocessor can later have those congestion levels altered for specific
needs if stress testing shows that the taskprocessor is a symptom of
overloading or needs to handle bursty activity without triggering an
overload alert.

* Add CLI "core show taskprocessor" low/high water columns.

* Fixed __allocate_taskprocessor() to not use RAII_VAR().  RAII_VAR() was
never a good thing to use when creating a taskprocessor because of the
nature of how its references needed to be cleaned up on a partial
creation.

* Made res_pjsip's distributor check if the taskprocessor overload alert
is active before placing a message representing brand new work onto a
distributor serializer.

ASTERISK-26088
Reported by:  Richard Mudgett

Change-Id: I182f1be603529cd665958661c4c05ff9901825fa
This commit is contained in:
Richard Mudgett
2016-06-02 16:08:19 -05:00
parent 26e3492246
commit 610eee2a36
3 changed files with 177 additions and 58 deletions

View File

@@ -76,6 +76,10 @@ struct ast_taskprocessor {
void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor low water clear alert level */
long tps_queue_low;
/*! \brief Taskprocessor high water alert trigger level */
long tps_queue_high;
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
struct ast_taskprocessor_listener *listener;
@@ -85,6 +89,8 @@ struct ast_taskprocessor {
unsigned int executing:1;
/*! Indicates that a high water warning has been issued on this task processor */
unsigned int high_water_warned:1;
/*! Indicates that a high water alert is active on this taskprocessor */
unsigned int high_water_alert:1;
};
/*!
@@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
static void tps_taskprocessor_destroy(void *tps);
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
static int tps_ping_handler(void *datap);
/*! \brief Remove the front task off the taskprocessor queue */
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
struct ao2_container *sorted_tps;
struct ast_taskprocessor *tps;
struct ao2_iterator iter;
#define FMT_HEADERS "%-45s %10s %10s %10s\n"
#define FMT_FIELDS "%-45s %10lu %10lu %10lu\n"
#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"
#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"
switch (cmd) {
case CLI_INIT:
@@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_FAILURE;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth");
ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
tcount = 0;
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
while ((tps = ao2_iterator_next(&iter))) {
@@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
maxqsize = 0;
processed = 0;
}
ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize);
ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
tps->tps_queue_low, tps->tps_queue_high);
ast_taskprocessor_unreference(tps);
++tcount;
}
@@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
}
/*! Count of the number of taskprocessors in high water alert. */
static unsigned int tps_alert_count;
/*! Access protection for tps_alert_count */
AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
/*!
* \internal
* \brief Add a delta to tps_alert_count with protection.
* \since 13.10.0
*
* \param tps Taskprocessor updating queue water mark alert trigger.
* \param delta The amount to add to tps_alert_count.
*
* \return Nothing
*/
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
{
unsigned int old;
ast_rwlock_wrlock(&tps_alert_lock);
old = tps_alert_count;
tps_alert_count += delta;
if (DEBUG_ATLEAST(3)
/* and tps_alert_count becomes zero or non-zero */
&& !old != !tps_alert_count) {
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
tps->name, tps_alert_count ? "triggered" : "cleared");
}
ast_rwlock_unlock(&tps_alert_lock);
}
unsigned int ast_taskprocessor_alert_get(void)
{
unsigned int count;
ast_rwlock_rdlock(&tps_alert_lock);
count = tps_alert_count;
ast_rwlock_unlock(&tps_alert_lock);
return count;
}
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
{
if (!tps || high_water < 0 || high_water < low_water) {
return -1;
}
if (low_water < 0) {
/* Set low water level to 90% of high water level */
low_water = (high_water * 9) / 10;
}
ao2_lock(tps);
tps->tps_queue_low = low_water;
tps->tps_queue_high = high_water;
if (tps->high_water_alert) {
if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
/* Update water mark alert immediately */
tps->high_water_alert = 0;
tps_alert_add(tps, -1);
}
} else {
if (high_water <= tps->tps_queue_size) {
/* Update water mark alert immediately */
tps->high_water_alert = 1;
tps_alert_add(tps, +1);
}
}
ao2_unlock(tps);
return 0;
}
/* destroy the taskprocessor */
static void tps_taskprocessor_destroy(void *tps)
static void tps_taskprocessor_dtor(void *tps)
{
struct ast_taskprocessor *t = tps;
struct tps_task *task;
if (!tps) {
ast_log(LOG_ERROR, "missing taskprocessor\n");
return;
}
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
/* free it */
ast_free(t->stats);
t->stats = NULL;
ast_free((char *) t->name);
if (t->listener) {
ao2_ref(t->listener, -1);
t->listener = NULL;
}
while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
tps_task_free(task);
}
t->tps_queue_size = 0;
if (t->high_water_alert) {
t->high_water_alert = 0;
tps_alert_add(t, -1);
}
ast_free(t->stats);
t->stats = NULL;
ast_free((char *) t->name);
t->name = NULL;
ao2_cleanup(t->listener);
t->listener = NULL;
}
/* pop the front task and return it */
@@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
struct tps_task *task;
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
--tps->tps_queue_size;
if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
tps->high_water_alert = 0;
tps_alert_add(tps, -1);
}
}
return task;
}
@@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void)
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
{
RAII_VAR(struct ast_taskprocessor *, p,
ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
struct ast_taskprocessor *p;
p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
}
if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
return NULL;
}
if (!(p->name = ast_strdup(name))) {
/* Set default congestion water level alert triggers. */
p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
p->stats = ast_calloc(1, sizeof(*p->stats));
p->name = ast_strdup(name);
if (!p->stats || !p->name) {
ao2_ref(p, -1);
return NULL;
}
@@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
if (!(ao2_link(tps_singletons, p))) {
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
listener->tps = NULL;
ao2_ref(p, -1);
ao2_ref(p, -2);
return NULL;
}
if (p->listener->callbacks->start(p->listener)) {
ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
p->name);
ast_taskprocessor_unreference(p);
return NULL;
}
/* RAII_VAR will decrement the refcount at the end of the function.
* Since we want to pass back a reference to p, we bump the refcount
*/
ao2_ref(p, +1);
return p;
}
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
@@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
tps->name, previous_size);
tps->high_water_warned = 1;
if (previous_size >= tps->tps_queue_high) {
if (!tps->high_water_warned) {
tps->high_water_warned = 1;
ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
tps->name, previous_size);
}
if (!tps->high_water_alert) {
tps->high_water_alert = 1;
tps_alert_add(tps, +1);
}
}
/* The currently executing task counts as still in queue */