taskprocessor: Enable subsystems and overload by subsystem

To prevent one subsystem's taskprocessors from causing others
to stall, new capabilities have been added to taskprocessors.

* Any taskprocessor name that has a '/' will have the part
  before the '/' saved as its "subsystem".
  Examples:
  "sorcery/acl-0000006a" and "sorcery/aor-00000019"
  will be grouped to subsystem "sorcery".
  "pjsip/distributor-00000025" and "pjsip/distributor-00000026"
  will bn grouped to subsystem "pjsip".
  Taskprocessors with no '/' have an empty subsystem.

* When a taskprocessor enters high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will
  be incremented.

* When a taskprocessor leaves high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will be
  decremented.

* A new api ast_taskprocessor_get_subsystem_alert() has been
  added that returns the number of taskprocessors in alert for
  the subsystem.

* A new CLI command "core show taskprocessor alerted subsystems"
  has been added.

* A new unit test was addded.

REMINDER: The taskprocessor code itself doesn't take any action
based on high-water alerts or overloading.  It's up to taskprocessor
users to check and take action themselves.  Currently only the pjsip
distributor does this.

* A new pjsip/global option "taskprocessor_overload_trigger"
  has been added that allows the user to select the trigger
  mechanism the distributor uses to pause accepting new requests.
  "none": Don't pause on any overload condition.
  "global": Pause on ANY taskprocessor overload (the default and
  current behavior)
  "pjsip_only": Pause only on pjsip taskprocessor overloads.

* The core pjsip pool was renamed from "SIP" to "pjsip" so it can
  be properly grouped into the "pjsip" subsystem.

* stasis taskprocessor names were changed to "stasis" as the
  subsystem.

* Sorcery core taskprocessor names were changed to "sorcery" to
  match the object taskprocessors.

Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
This commit is contained in:
George Joseph
2019-02-15 11:53:50 -07:00
parent 8681fc9db7
commit c2adeb9dc2
13 changed files with 523 additions and 10 deletions

View File

@@ -51,6 +51,7 @@
#define DEFAULT_IGNORE_URI_USER_OPTIONS 0
#define DEFAULT_USE_CALLERID_CONTACT 0
#define DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION 0
#define DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL
/*!
* \brief Cached global config object
@@ -110,6 +111,8 @@ struct global_config {
unsigned int use_callerid_contact;
/*! Nonzero if need to send AMI ContactStatus event when a contact is updated */
unsigned int send_contact_status_on_update_registration;
/*! Trigger the distributor should use to pause accepting new dialogs */
enum ast_sip_taskprocessor_overload_trigger overload_trigger;
};
static void global_destructor(void *obj)
@@ -483,6 +486,58 @@ unsigned int ast_sip_get_send_contact_status_on_update_registration(void)
return send_contact_status_on_update_registration;
}
enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void)
{
enum ast_sip_taskprocessor_overload_trigger trigger;
struct global_config *cfg;
cfg = get_global_cfg();
if (!cfg) {
return DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER;
}
trigger = cfg->overload_trigger;
ao2_ref(cfg, -1);
return trigger;
}
static int overload_trigger_handler(const struct aco_option *opt,
struct ast_variable *var, void *obj)
{
struct global_config *cfg = obj;
if (!strcasecmp(var->value, "none")) {
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_NONE;
} else if (!strcasecmp(var->value, "global")) {
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL;
} else if (!strcasecmp(var->value, "pjsip_only")) {
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY;
} else {
ast_log(LOG_WARNING, "Unknown overload trigger '%s' specified for %s\n",
var->value, var->name);
return -1;
}
return 0;
}
static const char *overload_trigger_map[] = {
[TASKPROCESSOR_OVERLOAD_TRIGGER_NONE] = "none",
[TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL] = "global",
[TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY] = "pjsip_only"
};
const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger)
{
return ARRAY_IN_BOUNDS(trigger, overload_trigger_map) ?
overload_trigger_map[trigger] : "";
}
static int overload_trigger_to_str(const void *obj, const intptr_t *args, char **buf)
{
const struct global_config *cfg = obj;
*buf = ast_strdup(ast_sip_overload_trigger_to_str(cfg->overload_trigger));
return 0;
}
/*!
* \internal
* \brief Observer to set default global object if none exist.
@@ -646,6 +701,9 @@ int ast_sip_initialize_sorcery_global(void)
ast_sorcery_object_field_register(sorcery, "global", "send_contact_status_on_update_registration",
DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION ? "yes" : "no",
OPT_YESNO_T, 1, FLDSET(struct global_config, send_contact_status_on_update_registration));
ast_sorcery_object_field_register_custom(sorcery, "global", "taskprocessor_overload_trigger",
overload_trigger_map[DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER],
overload_trigger_handler, overload_trigger_to_str, NULL, 0, 0);
if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
return -1;

View File

@@ -408,4 +408,14 @@ void ast_sip_destroy_transport_management(void);
*/
int ast_sip_persistent_endpoint_add_to_regcontext(const char *regcontext);
enum ast_sip_taskprocessor_overload_trigger {
TASKPROCESSOR_OVERLOAD_TRIGGER_NONE = 0,
TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL,
TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY
};
enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void);
const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger);
#endif /* RES_PJSIP_PRIVATE_H_ */

View File

@@ -51,6 +51,7 @@ static unsigned int unidentified_count;
static unsigned int unidentified_period;
static unsigned int unidentified_prune_interval;
static int using_auth_username;
static enum ast_sip_taskprocessor_overload_trigger overload_trigger;
struct unidentified_request{
struct timeval first_seen;
@@ -534,7 +535,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ao2_cleanup(dist);
return PJ_TRUE;
} else {
if (ast_taskprocessor_alert_get()) {
if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL &&
ast_taskprocessor_alert_get())
|| (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY &&
ast_taskprocessor_get_subsystem_alert("pjsip"))) {
/*
* When taskprocessors get backed up, there is a good chance that
* we are being overloaded and need to defer adding new work to
@@ -1196,6 +1200,8 @@ static void global_loaded(const char *object_type)
ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
overload_trigger = ast_sip_get_taskprocessor_overload_trigger();
/* Clean out the old task, if any */
ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
/* Have to do something with the return value to shut up the stupid compiler. */