ARI: Add subscription support

This patch adds an /applications API to ARI, allowing explicit management of
Stasis applications.

 * GET /applications - list current applications
 * GET /applications/{applicationName} - get details of a specific application
 * POST /applications/{applicationName}/subscription - explicitly subscribe to
   a channel, bridge or endpoint
 * DELETE /applications/{applicationName}/subscription - explicitly unsubscribe
   from a channel, bridge or endpoint

Subscriptions work by a reference counting mechanism: if you subscript to an
event source X number of times, you must unsubscribe X number of times to stop
receiveing events for that event source.

Review: https://reviewboard.asterisk.org/r/2862

(issue ASTERISK-22451)
Reported by: Matt Jordan




git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400522 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Matthew Jordan
2013-10-04 15:54:57 +00:00
parent c1e76f6ccb
commit 7fc567bd76
21 changed files with 1722 additions and 15 deletions

View File

@@ -734,6 +734,29 @@ int stasis_app_send(const char *app_name, struct ast_json *message)
return 0;
}
static int append_name(void *obj, void *arg, int flags)
{
struct app *app = obj;
struct ao2_container *apps = arg;
ast_str_container_add(apps, app_name(app));
return 0;
}
struct ao2_container *stasis_app_get_all(void)
{
RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
apps = ast_str_container_alloc(1);
if (!apps) {
return NULL;
}
ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
return ao2_bump(apps);
}
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@@ -783,6 +806,219 @@ void stasis_app_unregister(const char *app_name)
cleanup();
}
struct ast_json *stasis_app_to_json(const char *app_name)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
if (app_name) {
app = ao2_find(apps_registry, app_name, OBJ_KEY);
}
if (!app) {
return NULL;
}
return app_to_json(app);
}
#define CHANNEL_SCHEME "channel:"
#define BRIDGE_SCHEME "bridge:"
#define ENDPOINT_SCHEME "endpoint:"
/*! Struct for capturing event source information */
struct event_source {
enum {
EVENT_SOURCE_CHANNEL,
EVENT_SOURCE_BRIDGE,
EVENT_SOURCE_ENDPOINT,
} event_source_type;
union {
struct ast_channel *channel;
struct ast_bridge *bridge;
struct ast_endpoint *endpoint;
};
};
enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
RAII_VAR(struct event_source *, event_sources, NULL, ast_free);
enum stasis_app_subscribe_res res = STASIS_ASR_OK;
int i;
if (app_name) {
app = ao2_find(apps_registry, app_name, OBJ_KEY);
}
if (!app) {
ast_log(LOG_WARNING, "Could not find app '%s'\n",
app_name ? : "(null)");
return STASIS_ASR_APP_NOT_FOUND;
}
event_sources = ast_calloc(event_sources_count, sizeof(*event_sources));
if (!event_sources) {
return STASIS_ASR_INTERNAL_ERROR;
}
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
const char *uri = event_source_uris[i];
ast_debug(3, "%s: Checking %s\n", app_name,
uri);
if (ast_begins_with(uri, CHANNEL_SCHEME)) {
event_sources[i].event_source_type =
EVENT_SOURCE_CHANNEL;
event_sources[i].channel = ast_channel_get_by_name(
uri + strlen(CHANNEL_SCHEME));
if (!event_sources[i].channel) {
ast_log(LOG_WARNING, "Channel not found: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(uri, BRIDGE_SCHEME)) {
event_sources[i].event_source_type =
EVENT_SOURCE_BRIDGE;
event_sources[i].bridge = stasis_app_bridge_find_by_id(
uri + strlen(BRIDGE_SCHEME));
if (!event_sources[i].bridge) {
ast_log(LOG_WARNING, "Bridge not found: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(uri, ENDPOINT_SCHEME)) {
event_sources[i].event_source_type =
EVENT_SOURCE_ENDPOINT;
event_sources[i].endpoint = ast_endpoint_find_by_id(
uri + strlen(ENDPOINT_SCHEME));
if (!event_sources[i].endpoint) {
ast_log(LOG_WARNING, "Endpoint not found: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else {
ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
}
}
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
int sub_res = -1;
ast_debug(1, "%s: Subscribing to %s\n", app_name,
event_source_uris[i]);
switch (event_sources[i].event_source_type) {
case EVENT_SOURCE_CHANNEL:
sub_res = app_subscribe_channel(app,
event_sources[i].channel);
break;
case EVENT_SOURCE_BRIDGE:
sub_res = app_subscribe_bridge(app,
event_sources[i].bridge);
break;
case EVENT_SOURCE_ENDPOINT:
sub_res = app_subscribe_endpoint(app,
event_sources[i].endpoint);
break;
}
if (sub_res != 0) {
ast_log(LOG_WARNING,
"Error subscribing app '%s' to '%s'\n",
app_name, event_source_uris[i]);
res = STASIS_ASR_INTERNAL_ERROR;
}
}
if (res == STASIS_ASR_OK && json) {
ast_debug(1, "%s: Successful; setting results\n", app_name);
*json = app_to_json(app);
}
for (i = 0; i < event_sources_count; ++i) {
switch (event_sources[i].event_source_type) {
case EVENT_SOURCE_CHANNEL:
event_sources[i].channel =
ast_channel_cleanup(event_sources[i].channel);
break;
case EVENT_SOURCE_BRIDGE:
ao2_cleanup(event_sources[i].bridge);
event_sources[i].bridge = NULL;
break;
case EVENT_SOURCE_ENDPOINT:
ao2_cleanup(event_sources[i].endpoint);
event_sources[i].endpoint = NULL;
break;
}
}
return res;
}
enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
enum stasis_app_subscribe_res res = STASIS_ASR_OK;
int i;
if (app_name) {
ast_log(LOG_WARNING, "Could not find app '%s'\n",
app_name ? : "(null)");
app = ao2_find(apps_registry, app_name, OBJ_KEY);
}
if (!app) {
return STASIS_ASR_APP_NOT_FOUND;
}
/* Validate the input */
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
const char *channel_id = event_source_uris[i] +
strlen(CHANNEL_SCHEME);
if (!app_is_subscribed_channel_id(app, channel_id)) {
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
const char *bridge_id = event_source_uris[i] +
strlen(BRIDGE_SCHEME);
if (!app_is_subscribed_bridge_id(app, bridge_id)) {
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
const char *endpoint_id = event_source_uris[i] +
strlen(ENDPOINT_SCHEME);
if (!app_is_subscribed_endpoint_id(app, endpoint_id)) {
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else {
res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
}
}
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
const char *channel_id = event_source_uris[i] +
strlen(CHANNEL_SCHEME);
app_unsubscribe_channel_id(app, channel_id);
} else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
const char *bridge_id = event_source_uris[i] +
strlen(BRIDGE_SCHEME);
app_unsubscribe_bridge_id(app, bridge_id);
} else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
const char *endpoint_id = event_source_uris[i] +
strlen(ENDPOINT_SCHEME);
app_unsubscribe_endpoint_id(app, endpoint_id);
}
}
if (res == STASIS_ASR_OK && json) {
*json = app_to_json(app);
}
return res;
}
void stasis_app_ref(void)
{
ast_module_ref(ast_module_info->self);