ARI: Add ability to raise arbitrary User Events

User events can now be generated from ARI.  Events can be signalled with
arbitrary json variables, and include one or more of channel, bridge, or
endpoint snapshots.  An application must be specified which will receive
the event message (other applications can subscribe to it).  The message
will also be delivered via AMI provided a channel is attached.  Dialplan
generated user event messages are still transmitted via the channel, and
will only be received by a stasis application they are attached to or if
the channel is subscribed to.

This change also introduces the multi object blob mechanism used to send
multiple snapshot types in a single message.  The dialplan app UserEvent
was also changed to use multi object blob, and a new stasis message type
created to handle them.

ASTERISK-22697 #close
Review: https://reviewboard.asterisk.org/r/3494/
........

Merged revisions 414405 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@414406 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Scott Griepentrog
2014-05-22 16:09:51 +00:00
parent d00882108f
commit cf21644d6a
17 changed files with 839 additions and 100 deletions

View File

@@ -3076,7 +3076,6 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
struct ast_json_iter *iter;
int has_type = 0;
int has_application = 0;
int has_channel = 0;
int has_eventname = 0;
int has_userevent = 0;
@@ -3110,9 +3109,17 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
res = 0;
}
} else
if (strcmp("bridge", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_bridge(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI ChannelUserevent field bridge failed validation\n");
res = 0;
}
} else
if (strcmp("channel", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_channel = 1;
prop_is_valid = ast_ari_validate_channel(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
@@ -3120,6 +3127,15 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
res = 0;
}
} else
if (strcmp("endpoint", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_endpoint(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI ChannelUserevent field endpoint failed validation\n");
res = 0;
}
} else
if (strcmp("eventname", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_eventname = 1;
@@ -3158,11 +3174,6 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
res = 0;
}
if (!has_channel) {
ast_log(LOG_ERROR, "ARI ChannelUserevent missing required field channel\n");
res = 0;
}
if (!has_eventname) {
ast_log(LOG_ERROR, "ARI ChannelUserevent missing required field eventname\n");
res = 0;

View File

@@ -1278,7 +1278,9 @@ ari_validator ast_ari_validate_application_fn(void);
* - type: string (required)
* - application: string (required)
* - timestamp: Date
* - channel: Channel (required)
* - bridge: Bridge
* - channel: Channel
* - endpoint: Endpoint
* - eventname: string (required)
* - userevent: object (required)
* ChannelVarset

View File

@@ -217,3 +217,59 @@ void ast_ari_websocket_events_event_websocket(struct ast_ari_websocket_session *
ast_json_unref(msg);
}
}
void ast_ari_events_user_event(struct ast_variable *headers,
struct ast_ari_events_user_event_args *args,
struct ast_ari_response *response)
{
enum stasis_app_user_event_res res;
struct ast_json *json_variables = NULL;
if (args->variables) {
ast_ari_events_user_event_parse_body(args->variables, args);
json_variables = ast_json_object_get(args->variables, "variables");
}
if (ast_strlen_zero(args->application)) {
ast_ari_response_error(response, 400, "Bad Request",
"Missing parameter application");
return;
}
res = stasis_app_user_event(args->application,
args->event_name,
args->source, args->source_count,
json_variables);
switch (res) {
case STASIS_APP_USER_OK:
ast_ari_response_no_content(response);
break;
case STASIS_APP_USER_APP_NOT_FOUND:
ast_ari_response_error(response, 404, "Not Found",
"Application not found");
break;
case STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND:
ast_ari_response_error(response, 422, "Unprocessable Entity",
"Event source was not found");
break;
case STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME:
ast_ari_response_error(response, 400, "Bad Request",
"Invalid event source URI scheme");
break;
case STASIS_APP_USER_USEREVENT_INVALID:
ast_ari_response_error(response, 400, "Bad Request",
"Invalid userevnet data");
break;
case STASIS_APP_USER_INTERNAL_ERROR:
default:
ast_ari_response_error(response, 500, "Internal Server Error",
"Error processing request");
}
}

View File

@@ -56,5 +56,39 @@ struct ast_ari_events_event_websocket_args {
* \param args Swagger parameters.
*/
void ast_ari_websocket_events_event_websocket(struct ast_ari_websocket_session *session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args);
/*! Argument struct for ast_ari_events_user_event() */
struct ast_ari_events_user_event_args {
/*! Event name */
const char *event_name;
/*! The name of the application that will receive this event */
const char *application;
/*! Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName} */
const char **source;
/*! Length of source array. */
size_t source_count;
/*! Parsing context for source. */
char *source_parse;
/*! custom key/value pairs added to the user event */
struct ast_json *variables;
};
/*!
* \brief Body parsing function for /events/user/{eventName}.
* \param body The JSON body from which to parse parameters.
* \param[out] args The args structure to parse into.
* \retval zero on success
* \retval non-zero on failure
*/
int ast_ari_events_user_event_parse_body(
struct ast_json *body,
struct ast_ari_events_user_event_args *args);
/*!
* \brief Generate a user event.
*
* \param headers HTTP headers
* \param args Swagger parameters
* \param[out] response HTTP response
*/
void ast_ari_events_user_event(struct ast_variable *headers, struct ast_ari_events_user_event_args *args, struct ast_ari_response *response);
#endif /* _ASTERISK_RESOURCE_EVENTS_H */

View File

@@ -149,14 +149,202 @@ fin: __attribute__((unused))
ast_free(args.app_parse);
ast_free(args.app);
}
int ast_ari_events_user_event_parse_body(
struct ast_json *body,
struct ast_ari_events_user_event_args *args)
{
struct ast_json *field;
/* Parse query parameters out of it */
field = ast_json_object_get(body, "application");
if (field) {
args->application = ast_json_string_get(field);
}
field = ast_json_object_get(body, "source");
if (field) {
/* If they were silly enough to both pass in a query param and a
* JSON body, free up the query value.
*/
ast_free(args->source);
if (ast_json_typeof(field) == AST_JSON_ARRAY) {
/* Multiple param passed as array */
size_t i;
args->source_count = ast_json_array_size(field);
args->source = ast_malloc(sizeof(*args->source) * args->source_count);
if (!args->source) {
return -1;
}
for (i = 0; i < args->source_count; ++i) {
args->source[i] = ast_json_string_get(ast_json_array_get(field, i));
}
} else {
/* Multiple param passed as single value */
args->source_count = 1;
args->source = ast_malloc(sizeof(*args->source) * args->source_count);
if (!args->source) {
return -1;
}
args->source[0] = ast_json_string_get(field);
}
}
return 0;
}
/*!
* \brief Parameter parsing callback for /events/user/{eventName}.
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
* \param[out] response Response to the HTTP request.
*/
static void ast_ari_events_user_event_cb(
struct ast_tcptls_session_instance *ser,
struct ast_variable *get_params, struct ast_variable *path_vars,
struct ast_variable *headers, struct ast_ari_response *response)
{
struct ast_ari_events_user_event_args args = {};
struct ast_variable *i;
RAII_VAR(struct ast_json *, body, NULL, ast_json_unref);
#if defined(AST_DEVMODE)
int is_valid;
int code;
#endif /* AST_DEVMODE */
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "application") == 0) {
args.application = (i->value);
} else
if (strcmp(i->name, "source") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.source_parse = ast_strdup(i->value);
if (!args.source_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (strlen(args.source_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.source_count = 1;
vals[0] = args.source_parse;
} else {
args.source_count = ast_app_separate_args(
args.source_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.source_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.source_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for source");
goto fin;
}
args.source = ast_malloc(sizeof(*args.source) * args.source_count);
if (!args.source) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.source_count; ++j) {
args.source[j] = (vals[j]);
}
} else
{}
}
for (i = path_vars; i; i = i->next) {
if (strcmp(i->name, "eventName") == 0) {
args.event_name = (i->value);
} else
{}
}
/* Look for a JSON request entity */
body = ast_http_get_json(ser, headers);
if (!body) {
switch (errno) {
case EFBIG:
ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large");
goto fin;
case ENOMEM:
ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request");
goto fin;
case EIO:
ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body");
goto fin;
}
}
args.variables = ast_json_ref(body);
ast_ari_events_user_event(headers, &args, response);
#if defined(AST_DEVMODE)
code = response->response_code;
switch (code) {
case 0: /* Implementation is still a stub, or the code wasn't set */
is_valid = response->message == NULL;
break;
case 500: /* Internal Server Error */
case 501: /* Not Implemented */
case 404: /* Application does not exist. */
case 422: /* Event source not found. */
case 400: /* Invalid even tsource URI or userevent data. */
is_valid = 1;
break;
default:
if (200 <= code && code <= 299) {
is_valid = ast_ari_validate_void(
response->message);
} else {
ast_log(LOG_ERROR, "Invalid error response %d for /events/user/{eventName}\n", code);
is_valid = 0;
}
}
if (!is_valid) {
ast_log(LOG_ERROR, "Response validation failed for /events/user/{eventName}\n");
ast_ari_response_error(response, 500,
"Internal Server Error", "Response validation failed");
}
#endif /* AST_DEVMODE */
fin: __attribute__((unused))
ast_free(args.source_parse);
ast_free(args.source);
return;
}
/*! \brief REST handler for /api-docs/events.{format} */
static struct stasis_rest_handlers events_user_eventName = {
.path_segment = "eventName",
.is_wildcard = 1,
.callbacks = {
[AST_HTTP_POST] = ast_ari_events_user_event_cb,
},
.num_children = 0,
.children = { }
};
/*! \brief REST handler for /api-docs/events.{format} */
static struct stasis_rest_handlers events_user = {
.path_segment = "user",
.callbacks = {
},
.num_children = 1,
.children = { &events_user_eventName, }
};
/*! \brief REST handler for /api-docs/events.{format} */
static struct stasis_rest_handlers events = {
.path_segment = "events",
.callbacks = {
},
.num_children = 0,
.children = { }
.num_children = 1,
.children = { &events_user, }
};
static int load_module(void)

View File

@@ -61,6 +61,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_app_impl.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/strings.h"
#include "stasis/app.h"
@@ -1310,6 +1311,89 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
json, app_unsubscribe);
}
enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
const char *event_name,
const char **source_uris, int sources_count,
struct ast_json *json_variables)
{
RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
RAII_VAR(void *, obj, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
enum stasis_app_subscribe_res res = STASIS_APP_USER_INTERNAL_ERROR;
struct ast_json *json_value;
int have_channel = 0;
int i;
if (!app) {
ast_log(LOG_WARNING, "App %s not found\n", app_name);
return STASIS_APP_USER_APP_NOT_FOUND;
}
blob = json_variables;
if (!blob) {
blob = ast_json_pack("{}");
}
json_value = ast_json_string_create(event_name);
if (!json_value) {
ast_log(LOG_ERROR, "unable to create json string\n");
return res;
}
if (ast_json_object_set(blob, "eventname", json_value)) {
ast_log(LOG_ERROR, "unable to set eventname to blob\n");
return res;
}
multi = ast_multi_object_blob_create(blob);
for (i = 0; i < sources_count; ++i) {
const char *uri = source_uris[i];
void *snapshot=NULL;
enum stasis_user_multi_object_snapshot_type type;
if (ast_begins_with(uri, "channel:")) {
type = STASIS_UMOS_CHANNEL;
snapshot = ast_channel_snapshot_get_latest(uri + 8);
have_channel = 1;
} else if (ast_begins_with(uri, "bridge:")) {
type = STASIS_UMOS_BRIDGE;
snapshot = ast_bridge_snapshot_get_latest(uri + 7);
} else if (ast_begins_with(uri, "endpoint:")) {
type = STASIS_UMOS_ENDPOINT;
snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
} else {
ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
}
if (!snapshot) {
ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
}
ast_multi_object_blob_add(multi, type, snapshot);
}
message = stasis_message_create(ast_multi_user_event_type(), multi);
if (!message) {
ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
return res;
}
/*
* Publishing to two different topics is normally to be avoided -- except
* in this case both are final destinations with no forwards (only listeners).
* The message has to be delivered to the application topic for ARI, but a
* copy is also delivered directly to the manager for AMI if there is a channel.
*/
stasis_publish(ast_app_get_topic(app), message);
if (have_channel) {
stasis_publish(ast_manager_get_topic(), message);
}
return STASIS_APP_USER_OK;
}
void stasis_app_ref(void)
{
ast_module_ref(ast_module_info->self);

View File

@@ -795,6 +795,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return app;
}
struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
return app->topic;
}
/*!
* \brief Send a message to the given application.
* \param app App to send the message to.