Add channel events for res_stasis apps

This change adds a framework in res_stasis for handling events from
channel topics. JSON event generation and validation code is created
from event documentation in rest-api/api-docs/events.json to assist in
JSON event generation, ensure consistency, and ensure that accurate
documentation is available for ALL events that are received by
res_stasis applications.

The userevent application has been refactored along with the code that
handles userevent channel blob events to pass the headers as key/value
pairs in the JSON blob. As a side-effect, app_userevent now handles
duplicate keys by overwriting the previous value.

Review: https://reviewboard.asterisk.org/r/2428/
(closes issue ASTERISK-21180)
Patch-By: Kinsey Moore <kmoore@digium.com>


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@388275 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Kinsey Moore
2013-05-10 13:13:06 +00:00
parent 2cfedc12ad
commit 7ce05bfb9b
18 changed files with 1593 additions and 292 deletions

View File

@@ -153,6 +153,12 @@ Sorcery
column named "id" within their schema when using the Sorcery realtime module.
This column must be able to contain a string of up to 128 characters in length.
app_userevent
------------------
* UserEvent will now handle duplicate keys by overwriting the previous value
assigned to the key. UserEvent invocations will also be distributed to any
interested res_stasis applications.
------------------------------------------------------------------------------
--- Functionality changes from Asterisk 10 to Asterisk 11 --------------------
------------------------------------------------------------------------------

View File

@@ -39,23 +39,28 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*** DOCUMENTATION
<application name="UserEvent" language="en_US">
<synopsis>
Send an arbitrary event to the manager interface.
Send an arbitrary user-defined event to parties interested in a channel (AMI users and relevant res_stasis applications).
</synopsis>
<syntax>
<parameter name="eventname" required="true" />
<parameter name="body" />
</syntax>
<description>
<para>Sends an arbitrary event to the manager interface, with an optional
<para>Sends an arbitrary event to interested parties, with an optional
<replaceable>body</replaceable> representing additional arguments. The
<replaceable>body</replaceable> may be specified as
a <literal>,</literal> delimited list of headers. Each additional
argument will be placed on a new line in the event. The format of the
event will be:</para>
a <literal>,</literal> delimited list of key:value pairs.</para>
<para>For AMI, each additional argument will be placed on a new line in
the event and the format of the event will be:</para>
<para> Event: UserEvent</para>
<para> UserEvent: &lt;specified event name&gt;</para>
<para> [body]</para>
<para>If no <replaceable>body</replaceable> is specified, only Event and UserEvent headers will be present.</para>
<para>If no <replaceable>body</replaceable> is specified, only Event and
UserEvent headers will be present.</para>
<para>For res_stasis applications, the event will be provided as a JSON
blob with additional arguments appearing as keys in the object and the
<replaceable>eventname</replaceable> under the
<literal>eventname</literal> key.</para>
</description>
</application>
***/
@@ -70,7 +75,6 @@ static int userevent_exec(struct ast_channel *chan, const char *data)
AST_APP_ARG(eventname);
AST_APP_ARG(extra)[100];
);
RAII_VAR(struct ast_str *, body, ast_str_create(16), ast_free);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
@@ -79,27 +83,39 @@ static int userevent_exec(struct ast_channel *chan, const char *data)
return -1;
}
if (!body) {
ast_log(LOG_WARNING, "Unable to allocate buffer\n");
return -1;
}
parse = ast_strdupa(data);
AST_STANDARD_APP_ARGS(args, parse);
for (x = 0; x < args.argc - 1; x++) {
ast_str_append(&body, 0, "%s\r\n", args.extra[x]);
blob = ast_json_pack("{s: s, s: s}",
"type", "userevent",
"eventname", args.eventname);
if (!blob) {
return -1;
}
blob = ast_json_pack("{s: s, s: s}",
"eventname", args.eventname,
"body", ast_str_buffer(body));
if (!blob) {
ast_log(LOG_WARNING, "Unable to create message buffer\n");
for (x = 0; x < args.argc - 1; x++) {
char *key, *value = args.extra[x];
struct ast_json *json_value;
key = strsep(&value, ":");
if (!value) {
/* no ':' in string? */
continue;
}
value = ast_strip(value);
json_value = ast_json_string_create(value);
if (!json_value) {
return -1;
}
/* ref stolen by ast_json_object_set */
if (ast_json_object_set(blob, key, json_value)) {
return -1;
}
}
msg = ast_channel_blob_create(
chan, ast_channel_user_event_type(), blob);
if (!msg) {

View File

@@ -306,6 +306,34 @@ void ast_channel_publish_dial(struct ast_channel *caller,
*/
struct ast_json *ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot);
/*!
* \brief Compares the context, exten and priority of two snapshots.
* \since 12
*
* \param old_snapshot Old snapshot
* \param new_snapshot New snapshot
*
* \return True (non-zero) if context, exten or priority are identical.
* \return False (zero) if context, exten and priority changed.
*/
int ast_channel_snapshot_cep_equal(
const struct ast_channel_snapshot *old_snapshot,
const struct ast_channel_snapshot *new_snapshot);
/*!
* \brief Compares the callerid info of two snapshots.
* \since 12
*
* \param old_snapshot Old snapshot
* \param new_snapshot New snapshot
*
* \return True (non-zero) if callerid are identical.
* \return False (zero) if callerid changed.
*/
int ast_channel_snapshot_caller_id_equal(
const struct ast_channel_snapshot *old_snapshot,
const struct ast_channel_snapshot *new_snapshot);
/*!
* \brief Dispose of the stasis channel topics and message types
*/

View File

@@ -402,34 +402,6 @@ static struct snapshot_manager_event *channel_state_change(
return NULL;
}
/*!
* \brief Compares the context, exten and priority of two snapshots.
* \param old_snapshot Old snapshot
* \param new_snapshot New snapshot
* \return True (non-zero) if context, exten or priority are identical.
* \return False (zero) if context, exten and priority changed.
*/
static inline int cep_equal(
const struct ast_channel_snapshot *old_snapshot,
const struct ast_channel_snapshot *new_snapshot)
{
ast_assert(old_snapshot != NULL);
ast_assert(new_snapshot != NULL);
/* We actually get some snapshots with CEP set, but before the
* application is set. Since empty application is invalid, we treat
* setting the application from nothing as a CEP change.
*/
if (ast_strlen_zero(old_snapshot->appl) &&
!ast_strlen_zero(new_snapshot->appl)) {
return 0;
}
return old_snapshot->priority == new_snapshot->priority &&
strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
}
static struct snapshot_manager_event *channel_newexten(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
@@ -444,7 +416,7 @@ static struct snapshot_manager_event *channel_newexten(
return NULL;
}
if (old_snapshot && cep_equal(old_snapshot, new_snapshot)) {
if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
return NULL;
}
@@ -459,23 +431,6 @@ static struct snapshot_manager_event *channel_newexten(
new_snapshot->data);
}
/*!
* \brief Compares the callerid info of two snapshots.
* \param old_snapshot Old snapshot
* \param new_snapshot New snapshot
* \return True (non-zero) if callerid are identical.
* \return False (zero) if callerid changed.
*/
static inline int caller_id_equal(
const struct ast_channel_snapshot *old_snapshot,
const struct ast_channel_snapshot *new_snapshot)
{
ast_assert(old_snapshot != NULL);
ast_assert(new_snapshot != NULL);
return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
}
static struct snapshot_manager_event *channel_new_callerid(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
@@ -485,7 +440,7 @@ static struct snapshot_manager_event *channel_new_callerid(
return NULL;
}
if (caller_id_equal(old_snapshot, new_snapshot)) {
if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
return NULL;
}
@@ -587,19 +542,62 @@ static void channel_varset_cb(void *data, struct stasis_subscription *sub,
variable, value);
}
/*!
* \brief Callback used to determine whether a key should be skipped when converting a JSON object to a manager blob
* \param key Key from JSON blob to be evaluated
* \retval non-zero if the key should be excluded
* \retval zero if the key should not be excluded
*/
typedef int (*key_exclusion_cb)(const char *key);
static struct ast_str *manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
{
struct ast_str *output_str = ast_str_create(32);
struct ast_json_iter *blob_iter = ast_json_object_iter(blob);
if (!output_str || !blob_iter) {
return NULL;
}
do {
const char *key = ast_json_object_iter_key(blob_iter);
const char *value = ast_json_string_get(ast_json_object_iter_value(blob_iter));
if (exclusion_cb && exclusion_cb(key)) {
continue;
}
ast_str_append(&output_str, 0, "%s: %s\r\n", key, value);
if (!output_str) {
return NULL;
}
} while ((blob_iter = ast_json_object_iter_next(blob, blob_iter)));
return output_str;
}
static int userevent_exclusion_cb(const char *key)
{
if (!strcmp("type", key)) {
return 1;
}
if (!strcmp("eventname", key)) {
return 1;
}
return 0;
}
static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, body, NULL, ast_free);
const char *eventname;
const char *body;
eventname = ast_json_string_get(ast_json_object_get(obj->blob, "eventname"));
body = ast_json_string_get(ast_json_object_get(obj->blob, "body"));
body = manager_str_from_json_object(obj->blob, userevent_exclusion_cb);
channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
if (!channel_event_string) {
if (!channel_event_string || !body) {
return;
}
@@ -621,7 +619,7 @@ static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
"%s"
"UserEvent: %s\r\n"
"%s",
ast_str_buffer(channel_event_string), eventname, body);
ast_str_buffer(channel_event_string), eventname, ast_str_buffer(body));
}
static void channel_hangup_request_cb(void *data,

View File

@@ -448,6 +448,37 @@ struct ast_json *ast_channel_snapshot_to_json(const struct ast_channel_snapshot
return ast_json_ref(json_chan);
}
int ast_channel_snapshot_cep_equal(
const struct ast_channel_snapshot *old_snapshot,
const struct ast_channel_snapshot *new_snapshot)
{
ast_assert(old_snapshot != NULL);
ast_assert(new_snapshot != NULL);
/* We actually get some snapshots with CEP set, but before the
* application is set. Since empty application is invalid, we treat
* setting the application from nothing as a CEP change.
*/
if (ast_strlen_zero(old_snapshot->appl) &&
!ast_strlen_zero(new_snapshot->appl)) {
return 0;
}
return old_snapshot->priority == new_snapshot->priority &&
strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
}
int ast_channel_snapshot_caller_id_equal(
const struct ast_channel_snapshot *old_snapshot,
const struct ast_channel_snapshot *new_snapshot)
{
ast_assert(old_snapshot != NULL);
ast_assert(new_snapshot != NULL);
return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
}
void ast_stasis_channels_shutdown(void)
{
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);

View File

@@ -39,6 +39,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_app.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/strings.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/callerid.h"
#include "stasis_http/resource_events.h"
/*! Time to wait for a frame in the application */
#define MAX_WAIT_MS 200
@@ -55,6 +58,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
*/
#define CONTROLS_NUM_BUCKETS 127
/*!
* \brief Number of buckets for the channels container for app instances. Remember
* to keep it a prime number!
*/
#define APP_CHANNELS_BUCKETS 7
/*!
* \brief Number of buckets for the blob_handlers container. Remember to keep
* it a prime number!
*/
#define BLOB_HANDLER_BUCKETS 7
/*!
* \brief Stasis application container. Please call apps_registry() instead of
* directly accessing.
@@ -63,6 +78,9 @@ struct ao2_container *__apps_registry;
struct ao2_container *__app_controls;
/*! \brief Message router for the channel caching topic */
struct stasis_message_router *channel_router;
/*! Ref-counting accessor for the stasis applications container */
static struct ao2_container *apps_registry(void)
{
@@ -81,6 +99,8 @@ struct app {
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
/*! List of channel identifiers this app instance is interested in */
struct ao2_container *channels;
/*! Name of the Stasis application */
char name[];
};
@@ -91,12 +111,14 @@ static void app_dtor(void *obj)
ao2_cleanup(app->data);
app->data = NULL;
ao2_cleanup(app->channels);
app->channels = NULL;
}
/*! Constructor for \ref app. */
static struct app *app_create(const char *name, stasis_app_cb handler, void *data)
{
struct app *app;
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
size_t size;
ast_assert(name != NULL);
@@ -114,6 +136,12 @@ static struct app *app_create(const char *name, stasis_app_cb handler, void *dat
ao2_ref(data, +1);
app->data = data;
app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
if (!app->channels) {
return NULL;
}
ao2_ref(app, +1);
return app;
}
@@ -140,6 +168,27 @@ static int app_compare(void *lhs, void *rhs, int flags)
}
}
static int app_add_channel(struct app* app, const struct ast_channel *chan)
{
const char *uniqueid;
ast_assert(chan != NULL);
ast_assert(app != NULL);
uniqueid = ast_channel_uniqueid(chan);
if (!ast_str_container_add(app->channels, uniqueid)) {
return -1;
}
return 0;
}
static void app_remove_channel(struct app* app, const struct ast_channel *chan)
{
ast_assert(chan != NULL);
ast_assert(app != NULL);
ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
}
/*!
* \brief Send a message to the given application.
* \param app App to send the message to.
@@ -316,6 +365,9 @@ void stasis_app_control_continue(struct stasis_app_control *control)
control->continue_to_dialplan = 1;
}
/*! \brief Typedef for blob handler callbacks */
typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
static int OK = 0;
static int FAIL = -1;
@@ -343,43 +395,11 @@ int stasis_app_control_answer(struct stasis_app_control *control)
return *retval;
}
static struct ast_json *app_event_create(
const char *event_name,
const struct ast_channel_snapshot *snapshot,
const struct ast_json *extra_info)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
if (extra_info) {
event = ast_json_deep_copy(extra_info);
} else {
event = ast_json_object_create();
}
if (snapshot) {
int ret;
/* Mustn't already have a channel field */
ast_assert(ast_json_object_get(event, "channel") == NULL);
ret = ast_json_object_set(
event,
"channel", ast_channel_snapshot_to_json(snapshot));
if (ret != 0) {
return NULL;
}
}
message = ast_json_pack("{s: o}", event_name, ast_json_ref(event));
return ast_json_ref(message);
}
static int send_start_msg(struct app *app, struct ast_channel *chan,
int argc, char *argv[])
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
struct ast_json *json_args;
@@ -393,19 +413,13 @@ static int send_start_msg(struct app *app, struct ast_channel *chan,
return -1;
}
msg = ast_json_pack("{s: {s: [], s: o}}",
"stasis-start",
"args",
"channel", ast_channel_snapshot_to_json(snapshot));
if (!msg) {
blob = ast_json_pack("{s: []}", "args");
if (!blob) {
return -1;
}
/* Append arguments to args array */
json_args = ast_json_object_get(
ast_json_object_get(msg, "stasis-start"),
"args");
json_args = ast_json_object_get(blob, "args");
ast_assert(json_args != NULL);
for (i = 0; i < argc; ++i) {
int r = ast_json_array_append(json_args,
@@ -416,6 +430,11 @@ static int send_start_msg(struct app *app, struct ast_channel *chan,
}
}
msg = stasis_json_event_stasis_start_create(snapshot, blob);
if (!msg) {
return -1;
}
app_send(app, msg);
return 0;
}
@@ -432,7 +451,8 @@ static int send_end_msg(struct app *app, struct ast_channel *chan)
if (snapshot == NULL) {
return -1;
}
msg = app_event_create("stasis-end", snapshot, NULL);
msg = stasis_json_event_stasis_end_create(snapshot);
if (!msg) {
return -1;
}
@@ -441,62 +461,201 @@ static int send_end_msg(struct app *app, struct ast_channel *chan)
return 0;
}
static void dtmf_handler(struct app *app, struct ast_channel_blob *obj)
static int app_watching_channel_cb(void *obj, void *arg, int flags)
{
RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
const char *direction;
RAII_VAR(char *, uniqueid, NULL, ao2_cleanup);
struct app *app = obj;
char *chan_uniqueid = arg;
/* To simplify events, we'll only generate on receive */
direction = ast_json_string_get(
ast_json_object_get(obj->blob, "direction"));
if (strcmp("Received", direction) != 0) {
return;
uniqueid = ao2_find(app->channels, chan_uniqueid, OBJ_KEY);
return uniqueid ? CMP_MATCH : 0;
}
extra = ast_json_pack(
"{s: o}",
"digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
if (!extra) {
return;
static struct ao2_container *get_watching_apps(const char *uniqueid)
{
RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
struct ao2_container *watching_apps;
char *uniqueid_dup;
RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
ast_assert(uniqueid != NULL);
ast_assert(apps != NULL);
uniqueid_dup = ast_strdupa(uniqueid);
watching_apps_iter = ao2_callback(apps, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
watching_apps = watching_apps_iter->c;
if (!ao2_container_count(watching_apps)) {
return NULL;
}
msg = app_event_create("dtmf-received", obj->snapshot, extra);
if (!msg) {
return;
ao2_ref(watching_apps, +1);
return watching_apps_iter->c;
}
/*! \brief Typedef for callbacks that get called on channel snapshot updates */
typedef struct ast_json *(*channel_snapshot_monitor)(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot);
/*! \brief Handle channel state changes */
static struct ast_json *channel_state(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
if (!old_snapshot) {
return stasis_json_event_channel_created_create(snapshot);
} else if (!new_snapshot) {
json = ast_json_pack("{s: i, s: s}",
"cause", snapshot->hangupcause,
"cause_txt", ast_cause2str(snapshot->hangupcause));
if (!json) {
return NULL;
}
return stasis_json_event_channel_destroyed_create(snapshot, json);
} else if (old_snapshot->state != new_snapshot->state) {
return stasis_json_event_channel_state_change_create(snapshot);
}
return NULL;
}
static struct ast_json *channel_dialplan(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
/* No Newexten event on cache clear */
if (!new_snapshot) {
return NULL;
}
/* Empty application is not valid for a Newexten event */
if (ast_strlen_zero(new_snapshot->appl)) {
return NULL;
}
if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
return NULL;
}
json = ast_json_pack("{s: s, s: s}",
"application", new_snapshot->appl,
"application_data", new_snapshot->data);
if (!json) {
return NULL;
}
return stasis_json_event_channel_dialplan_create(new_snapshot, json);
}
static struct ast_json *channel_callerid(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
/* No NewCallerid event on cache clear or first event */
if (!old_snapshot || !new_snapshot) {
return NULL;
}
if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
return NULL;
}
json = ast_json_pack("{s: i, s: s}",
"caller_presentation", new_snapshot->caller_pres,
"caller_presentation_txt", ast_describe_caller_presentation(new_snapshot->caller_pres));
if (!json) {
return NULL;
}
return stasis_json_event_channel_caller_id_create(new_snapshot, json);
}
static struct ast_json *channel_snapshot(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot)
{
if (!new_snapshot) {
return NULL;
}
return stasis_json_event_channel_snapshot_create(new_snapshot);
}
channel_snapshot_monitor channel_monitors[] = {
channel_snapshot,
channel_state,
channel_dialplan,
channel_callerid
};
static int app_send_cb(void *obj, void *arg, int flags)
{
struct app *app = obj;
struct ast_json *msg = arg;
app_send(app, msg);
return 0;
}
static void sub_handler(void *data, struct stasis_subscription *sub,
static void sub_snapshot_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct app *app = data;
RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
int i;
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(data);
watching_apps = get_watching_apps(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
if (!watching_apps) {
return;
}
if (ast_channel_snapshot_type() == stasis_message_type(message)) {
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
struct ast_channel_snapshot *snapshot =
stasis_message_data(message);
msg = app_event_create("channel-state-change", snapshot, NULL);
msg = channel_monitors[i](old_snapshot, new_snapshot);
if (msg) {
ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
}
}
static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
{
ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
}
static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_handler_cb handler_cb)
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
if (!obj->snapshot) {
return;
}
watching_apps = get_watching_apps(obj->snapshot->uniqueid);
if (!watching_apps) {
return;
}
msg = handler_cb(obj);
if (!msg) {
return;
}
app_send(app, msg);
} else if (ast_channel_dtmf_end_type() == stasis_message_type(message)) {
/* To simplify events, we'll only generate on DTMF end */
struct ast_channel_blob *blob = stasis_message_data(message);
dtmf_handler(app, blob);
}
distribute_message(watching_apps, msg);
}
/*!
@@ -544,8 +703,6 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
RAII_VAR(struct stasis_subscription *, subscription, NULL,
stasis_unsubscribe);
int res = 0;
int hungup = 0;
@@ -570,21 +727,17 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
ao2_link(controls, control);
}
subscription =
stasis_subscribe(ast_channel_topic(chan), sub_handler, app);
if (subscription == NULL) {
ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
app_name, ast_channel_name(chan));
return -1;
}
ao2_ref(app, +1); /* subscription now has a reference */
res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR, "Error sending start message to %s\n", app_name);
return res;
}
if (app_add_channel(app, chan)) {
ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
return -1;
}
while (1) {
RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
int r;
@@ -634,6 +787,7 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
}
}
app_remove_channel(app, chan);
res = send_end_msg(app, chan);
if (res != 0) {
ast_log(LOG_ERROR,
@@ -675,10 +829,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
if (app) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
SCOPED_LOCK(app_lock, app, ao2_lock, ao2_unlock);
msg = app_event_create("application-replaced", NULL, NULL);
blob = ast_json_pack("{s: s}", "application", app_name);
if (blob) {
msg = stasis_json_event_application_replaced_create(blob);
if (msg) {
app->handler(app->data, app_name, msg);
}
}
app->handler = handler;
ao2_cleanup(app->data);
@@ -706,6 +866,82 @@ void stasis_app_unregister(const char *app_name)
}
}
static struct ast_json *handle_blob_dtmf(struct ast_channel_blob *obj)
{
RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
const char *direction;
/* To simplify events, we'll only generate on receive */
direction = ast_json_string_get(
ast_json_object_get(obj->blob, "direction"));
if (strcmp("Received", direction) != 0) {
return NULL;
}
extra = ast_json_pack(
"{s: o}",
"digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
if (!extra) {
return NULL;
}
return stasis_json_event_channel_dtmf_received_create(obj->snapshot, extra);
}
/* To simplify events, we'll only generate on DTMF end (dtmf_end type) */
static void sub_dtmf_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
generic_blob_handler(obj, handle_blob_dtmf);
}
static struct ast_json *handle_blob_userevent(struct ast_channel_blob *obj)
{
return stasis_json_event_channel_userevent_create(obj->snapshot, obj->blob);
}
static void sub_userevent_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
generic_blob_handler(obj, handle_blob_userevent);
}
static struct ast_json *handle_blob_hangup_request(struct ast_channel_blob *obj)
{
return stasis_json_event_channel_hangup_request_create(obj->snapshot, obj->blob);
}
static void sub_hangup_request_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
generic_blob_handler(obj, handle_blob_hangup_request);
}
static struct ast_json *handle_blob_varset(struct ast_channel_blob *obj)
{
return stasis_json_event_channel_varset_create(obj->snapshot, obj->blob);
}
static void sub_varset_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
generic_blob_handler(obj, handle_blob_varset);
}
static int load_module(void)
{
int r = 0;
@@ -722,13 +958,30 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
return r;
channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
if (!channel_router) {
return AST_MODULE_LOAD_FAILURE;
}
r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_userevent_handler, NULL);
r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_varset_handler, NULL);
r |= stasis_message_router_add(channel_router, ast_channel_dtmf_begin_type(), sub_dtmf_handler, NULL);
r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_hangup_request_handler, NULL);
if (r) {
return AST_MODULE_LOAD_FAILURE;
}
return AST_MODULE_LOAD_SUCCESS;
}
static int unload_module(void)
{
int r = 0;
stasis_message_router_unsubscribe(channel_router);
channel_router = NULL;
ao2_cleanup(__apps_registry);
__apps_registry = NULL;

View File

@@ -42,6 +42,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/module.h"
#include "stasis_http/resource_events.h"
#include "asterisk/stasis_channels.h"
/*!
* \brief Parameter parsing callback for /events.
@@ -76,6 +77,524 @@ static struct stasis_rest_handlers events = {
.children = { }
};
struct ast_json *stasis_json_event_channel_snapshot_create(
struct ast_channel_snapshot *channel_snapshot
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
int ret;
ast_assert(channel_snapshot != NULL);
event = ast_json_object_create();
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_snapshot", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_destroyed_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "cause");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
validator = ast_json_object_get(blob, "cause_txt");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_destroyed", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_caller_id_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "caller_presentation_txt");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
validator = ast_json_object_get(blob, "caller_presentation");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_caller_id", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_hangup_request_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "soft");
if (validator) {
/* do validation? XXX */
}
validator = ast_json_object_get(blob, "cause");
if (validator) {
/* do validation? XXX */
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_hangup_request", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_application_replaced_create(
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "application");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
message = ast_json_pack("{s: o}", "application_replaced", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_varset_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "variable");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
validator = ast_json_object_get(blob, "value");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_varset", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_userevent_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "eventname");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_userevent", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_created_create(
struct ast_channel_snapshot *channel_snapshot
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
int ret;
ast_assert(channel_snapshot != NULL);
event = ast_json_object_create();
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_created", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_stasis_start_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "args");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "stasis_start", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_dialplan_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "application");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
validator = ast_json_object_get(blob, "application_data");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_dialplan", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_state_change_create(
struct ast_channel_snapshot *channel_snapshot
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
int ret;
ast_assert(channel_snapshot != NULL);
event = ast_json_object_create();
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_state_change", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_channel_dtmf_received_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
struct ast_json *validator;
int ret;
ast_assert(channel_snapshot != NULL);
ast_assert(blob != NULL);
ast_assert(ast_json_object_get(blob, "channel") == NULL);
ast_assert(ast_json_object_get(blob, "type") == NULL);
validator = ast_json_object_get(blob, "digit");
if (validator) {
/* do validation? XXX */
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
}
event = ast_json_deep_copy(blob);
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "channel_dtmf_received", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
struct ast_json *stasis_json_event_stasis_end_create(
struct ast_channel_snapshot *channel_snapshot
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
int ret;
ast_assert(channel_snapshot != NULL);
event = ast_json_object_create();
if (!event) {
return NULL;
}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
message = ast_json_pack("{s: o}", "stasis_end", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
static int load_module(void)
{
return stasis_http_add_handler(&events);

View File

@@ -52,10 +52,6 @@ static struct ast_json *oom_json;
*/
#define APPS_NUM_BUCKETS 7
struct websocket_app {
char *name;
};
/*!
* \internal
* \brief Helper to write a JSON object to a WebSocket.
@@ -78,35 +74,6 @@ static int websocket_write_json(struct ast_websocket *session,
strlen(str));
}
/*! Hash function for websocket_app */
static int hash_app(const void *obj, const int flags)
{
const struct websocket_app *app = obj;
const char *name = flags & OBJ_KEY ? obj : app->name;
return ast_str_hash(name);
}
/*! Comparison function for websocket_app */
static int compare_app(void *lhs, void *rhs, int flags)
{
const struct websocket_app *lhs_app = lhs;
const struct websocket_app *rhs_app = rhs;
const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
if (strcmp(lhs_app->name, rhs_name) == 0) {
return CMP_MATCH;
} else {
return 0;
}
}
static void app_dtor(void *obj)
{
struct websocket_app *app = obj;
ast_free(app->name);
}
struct stasis_ws_session_info {
struct ast_websocket *ws_session;
struct ao2_container *websocket_apps;
@@ -132,7 +99,7 @@ static struct stasis_ws_session_info *session_create(
session->ws_session = ws_session;
session->websocket_apps =
ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app);
ast_str_container_alloc(APPS_NUM_BUCKETS);
if (!session->websocket_apps) {
return NULL;
@@ -154,12 +121,12 @@ static struct stasis_ws_session_info *session_create(
static void session_shutdown(struct stasis_ws_session_info *session)
{
struct ao2_iterator i;
struct websocket_app *app;
char *app;
SCOPED_AO2LOCK(lock, session);
i = ao2_iterator_init(session->websocket_apps, 0);
while ((app = ao2_iterator_next(&i))) {
stasis_app_unregister(app->name);
stasis_app_unregister(app);
ao2_cleanup(app);
}
ao2_iterator_destroy(&i);
@@ -212,15 +179,10 @@ static int session_register_apps(struct stasis_ws_session_info *session,
return -1;
}
while ((app_name = strsep(&apps, ","))) {
RAII_VAR(struct websocket_app *, app, NULL, ao2_cleanup);
app = ao2_alloc(sizeof(*app), app_dtor);
if (!app) {
if (ast_str_container_add(session->websocket_apps, app_name)) {
websocket_write_json(session->ws_session, oom_json);
return -1;
}
app->name = ast_strdup(app_name);
ao2_link(session->websocket_apps, app);
stasis_app_register(app_name, app_handler, session);
}

View File

@@ -83,8 +83,8 @@ void stasis_http_get_endpoint(struct ast_variable *headers, struct ast_get_endpo
* JSON models
*
* Endpoint
* - resource: string (required)
* - technology: string (required)
* - name: string (required)
*/
#endif /* _ASTERISK_RESOURCE_ENDPOINTS_H */

View File

@@ -55,42 +55,240 @@ struct ast_event_websocket_args {
*/
void stasis_http_event_websocket(struct ast_variable *headers, struct ast_event_websocket_args *args, struct stasis_http_response *response);
struct ast_channel_snapshot;
struct ast_bridge_snapshot;
/*!
* \brief Some part of channel state changed.
*
* \param channel The channel to be used to generate this event
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_snapshot_create(
struct ast_channel_snapshot *channel_snapshot
);
/*!
* \brief Notification that a channel has been destroyed.
*
* \param channel The channel to be used to generate this event
* \param blob JSON blob containing the following parameters:
* - cause: integer - Integer representation of the cause of the hangup (required)
* - cause_txt: string - Text representation of the cause of the hangup (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_destroyed_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief Channel changed Caller ID.
*
* \param channel The channel that changed Caller ID.
* \param blob JSON blob containing the following parameters:
* - caller_presentation_txt: string - The text representation of the Caller Presentation value. (required)
* - caller_presentation: integer - The integer representation of the Caller Presentation value. (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_caller_id_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief A hangup was requested on the channel.
*
* \param channel The channel on which the hangup was requested.
* \param blob JSON blob containing the following parameters:
* - soft: boolean - Whether the hangup request was a soft hangup request.
* - cause: integer - Integer representation of the cause of the hangup.
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_hangup_request_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief Notification that another WebSocket has taken over for an application.
*
* \param blob JSON blob containing the following parameters:
* - application: string (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_application_replaced_create(
struct ast_json *blob
);
/*!
* \brief Channel variable changed.
*
* \param channel The channel on which the variable was set.
* \param blob JSON blob containing the following parameters:
* - variable: string - The variable that changed. (required)
* - value: string - The new value of the variable. (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_varset_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief User-generated event with additional user-defined fields in the object.
*
* \param channel The channel that signaled the user event.
* \param blob JSON blob containing the following parameters:
* - eventname: string - The name of the user event. (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_userevent_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief Notification that a channel has been created.
*
* \param channel The channel to be used to generate this event
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_created_create(
struct ast_channel_snapshot *channel_snapshot
);
/*!
* \brief Notification that a channel has entered a Stasis appliction.
*
* \param channel The channel to be used to generate this event
* \param blob JSON blob containing the following parameters:
* - args: List[string] - Arguments to the application (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_stasis_start_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief Channel changed location in the dialplan.
*
* \param channel The channel that changed dialplan location.
* \param blob JSON blob containing the following parameters:
* - application: string - The application that the channel is currently in. (required)
* - application_data: string - The data that was passed to the application when it was invoked. (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_dialplan_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief Notification of a channel's state change.
*
* \param channel The channel to be used to generate this event
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_state_change_create(
struct ast_channel_snapshot *channel_snapshot
);
/*!
* \brief DTMF received on a channel.
*
* \param channel The channel on which DTMF was received
* \param blob JSON blob containing the following parameters:
* - digit: string - DTMF digit received (0-9, A-E, # or *) (required)
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_channel_dtmf_received_create(
struct ast_channel_snapshot *channel_snapshot,
struct ast_json *blob
);
/*!
* \brief Notification that a channel has left a Stasis appliction.
*
* \param channel The channel to be used to generate this event
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
struct ast_json *stasis_json_event_stasis_end_create(
struct ast_channel_snapshot *channel_snapshot
);
/*
* JSON models
*
* DtmfReceived
* - digit: string
* - channel: Channel
* BridgeCreated
* - bridge: Bridge
* BridgeDestroyed
* - bridge: Bridge
* ChannelSnapshot
* ChannelDestroyed
* - cause: integer (required)
* - cause_txt: string (required)
* ChannelCallerId
* - caller_presentation_txt: string (required)
* - caller_presentation: integer (required)
* ChannelHangupRequest
* - soft: boolean
* - cause: integer
* ApplicationReplaced
* - application: string
* ChannelLeftBridge
* - bridge: Bridge
* - channel: Channel
* - application: string (required)
* ChannelVarset
* - variable: string (required)
* - value: string (required)
* ChannelUserevent
* - eventname: string (required)
* ChannelCreated
* StasisStart
* - args: List[string]
* - channel_info: Channel
* StasisEnd
* - channel_info: Channel
* - args: List[string] (required)
* ChannelDialplan
* - application: string (required)
* - application_data: string (required)
* ChannelStateChange
* - channel_info: Channel
* ChannelEnteredBridge
* - bridge: Bridge
* - channel: Channel
* ChannelDtmfReceived
* - digit: string (required)
* Event
* - stasis_start: StasisStart
* - channel_entered_bridge: ChannelEnteredBridge
* - channel_left_bridge: ChannelLeftBridge
* - channel_created: ChannelCreated
* - channel_destroyed: ChannelDestroyed
* - channel_dialplan: ChannelDialplan
* - channel_varset: ChannelVarset
* - application_replaced: ApplicationReplaced
* - channel_state_change: ChannelStateChange
* - bridge_created: BridgeCreated
* - stasis_start: StasisStart
* - application: string (required)
* - channel_hangup_request: ChannelHangupRequest
* - channel_userevent: ChannelUserevent
* - channel_snapshot: ChannelSnapshot
* - channel_dtmf_received: ChannelDtmfReceived
* - channel_caller_id: ChannelCallerId
* - stasis_end: StasisEnd
* - dtmf_received: DtmfReceived
* - bridge_destroyed: BridgeDestroyed
* StasisEnd
*/
#endif /* _ASTERISK_RESOURCE_EVENTS_H */

View File

@@ -107,7 +107,6 @@ class PathSegment(Stringify):
"""
return len(self.__children)
class AsteriskProcessor(SwaggerPostProcessor):
"""A SwaggerPostProcessor which adds fields needed to generate Asterisk
RESTful HTTP binding code.
@@ -145,6 +144,18 @@ class AsteriskProcessor(SwaggerPostProcessor):
segment = resource_api.root_path.get_child(api.path.split('/'))
for operation in api.operations:
segment.operations.append(operation)
resource_api.api_declaration.has_events = False
for model in resource_api.api_declaration.models:
if model.id == "Event":
resource_api.api_declaration.has_events = True
break
if resource_api.api_declaration.has_events:
resource_api.api_declaration.events = \
[self.process_model(model, context) for model in \
resource_api.api_declaration.models if model.id != "Event"]
else:
resource_api.api_declaration.events = []
# Since every API path should start with /[resource], root should
# have exactly one child.
if resource_api.root_path.num_children() != 1:
@@ -177,3 +188,43 @@ class AsteriskProcessor(SwaggerPostProcessor):
parameter.c_space = ''
else:
parameter.c_space = ' '
def process_model(self, model, context):
model.c_id = snakify(model.id)
model.channel = False
model.channel_desc = ""
model.bridge = False
model.bridge_desc = ""
model.properties = [self.process_property(model, prop, context) for prop in model.properties]
model.properties = [prop for prop in model.properties if prop]
model.has_properties = (len(model.properties) != 0)
return model
def process_property(self, model, prop, context):
# process channel separately since it will be pulled out
if prop.name == 'channel' and prop.type == 'Channel':
model.channel = True
model.channel_desc = prop.description or ""
return None
# process bridge separately since it will be pulled out
if prop.name == 'bridge' and prop.type == 'Bridge':
model.bridge = True
model.bridge_desc = prop.description or ""
return None
prop.c_name = snakify(prop.name)
if prop.type in self.type_mapping:
prop.c_type = self.type_mapping[prop.type]
prop.c_convert = self.convert_mapping[prop.c_type]
else:
prop.c_type = "Property type %s not mappable to a C type" % (prop.type)
prop.c_convert = "Property type %s not mappable to a C conversion" % (prop.type)
#raise SwaggerError(
# "Invalid property type %s" % prop.type, context)
# You shouldn't put a space between 'char *' and the variable
if prop.c_type.endswith('*'):
prop.c_space = ''
else:
prop.c_space = ' '
return prop

View File

@@ -0,0 +1,10 @@
struct ast_json *stasis_json_event_{{c_id}}_create(
{{#bridge}}
struct ast_bridge_snapshot *bridge_snapshot{{#channel}},{{/channel}}{{^channel}}{{#has_properties}},{{/has_properties}}{{/channel}}
{{/bridge}}
{{#channel}}
struct ast_channel_snapshot *channel_snapshot{{#has_properties}},{{/has_properties}}
{{/channel}}
{{#has_properties}}
struct ast_json *blob
{{/has_properties}}

View File

@@ -47,6 +47,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/module.h"
#include "stasis_http/resource_{{name}}.h"
{{#has_events}}
#include "asterisk/stasis_channels.h"
{{/has_events}}
{{#apis}}
{{#operations}}
@@ -96,6 +99,89 @@ static void stasis_http_{{c_nickname}}_cb(
{{> rest_handler}}
{{/root_path}}
{{#has_events}}
{{#events}}
{{> event_function_decl}}
)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
{{#has_properties}}
struct ast_json *validator;
{{/has_properties}}
{{#channel}}
int ret;
{{/channel}}
{{#bridge}}
{{^channel}}
int ret;
{{/channel}}
{{/bridge}}
{{#channel}}
ast_assert(channel_snapshot != NULL);
{{/channel}}
{{#bridge}}
ast_assert(bridge_snapshot != NULL);
{{/bridge}}
{{#has_properties}}
ast_assert(blob != NULL);
{{#channel}}
ast_assert(ast_json_object_get(blob, "channel") == NULL);
{{/channel}}
{{#bridge}}
ast_assert(ast_json_object_get(blob, "bridge") == NULL);
{{/bridge}}
ast_assert(ast_json_object_get(blob, "type") == NULL);
{{#properties}}
validator = ast_json_object_get(blob, "{{name}}");
if (validator) {
/* do validation? XXX */
{{#required}}
} else {
/* fail message generation if the required parameter doesn't exist */
return NULL;
{{/required}}
}
{{/properties}}
event = ast_json_deep_copy(blob);
{{/has_properties}}
{{^has_properties}}
event = ast_json_object_create();
{{/has_properties}}
if (!event) {
return NULL;
}
{{#channel}}
ret = ast_json_object_set(event,
"channel", ast_channel_snapshot_to_json(channel_snapshot));
if (ret) {
return NULL;
}
{{/channel}}
{{#bridge}}
ret = ast_json_object_set(event,
"bridge", ast_bridge_snapshot_to_json(bridge_snapshot));
if (ret) {
return NULL;
}
{{/bridge}}
message = ast_json_pack("{s: o}", "{{c_id}}", ast_json_ref(event));
if (!message) {
return NULL;
}
return ast_json_ref(message);
}
{{/events}}
{{/has_events}}
static int load_module(void)
{
return stasis_http_add_handler(&{{root_full_name}});

View File

@@ -64,6 +64,39 @@ void stasis_http_{{c_nickname}}(struct ast_variable *headers, struct ast_{{c_nic
{{/operations}}
{{/apis}}
{{#has_events}}
struct ast_channel_snapshot;
struct ast_bridge_snapshot;
{{#events}}
/*!
* \brief {{description}}
{{#notes}}
*
* {{{notes}}}
{{/notes}}
*
{{#channel}}
* \param channel {{#channel_desc}}{{channel_desc}}{{/channel_desc}}{{^channel_desc}}The channel to be used to generate this event{{/channel_desc}}
{{/channel}}
{{#bridge}}
* \param bridge {{#bridge_desc}}{{bridge_desc}}{{/bridge_desc}}{{^bridge_desc}}The bridge to be used to generate this event{{/bridge_desc}}
{{/bridge}}
{{#has_properties}}
* \param blob JSON blob containing the following parameters:
{{/has_properties}}
{{#properties}}
* - {{name}}: {{type}} {{#description}}- {{description}}{{/description}}{{#required}} (required){{/required}}
{{/properties}}
*
* \retval NULL on error
* \retval JSON (ast_json) describing the event
*/
{{> event_function_decl}}
);
{{/events}}
{{/has_events}}
/*
* JSON models
*
@@ -72,8 +105,7 @@ void stasis_http_{{c_nickname}}(struct ast_variable *headers, struct ast_{{c_nic
{{#properties}}
* - {{name}}: {{type}}{{#required}} (required){{/required}}
{{/properties}}
{{/models}}
*/
{{/models}} */
#endif /* _ASTERISK_RESOURCE_{{name_caps}}_H */
{{/api_declaration}}

View File

@@ -295,13 +295,17 @@ class Model(Stringify):
def __init__(self):
self.id = None
self.notes = None
self.description = None
self.properties = None
def load(self, id, model_json, processor, context):
context = add_context(context, model_json, 'id')
# This arrangement is required by the Swagger API spec
self.id = model_json.get('id')
if id != self.id:
raise SwaggerError("Model id doesn't match name", c)
self.description = model_json.get('description')
props = model_json.get('properties').items() or []
self.properties = [
Property(k).load(j, processor, context) for (k, j) in props]

View File

@@ -54,12 +54,16 @@
"required": true
},
"application_replaced": { "type": "ApplicationReplaced" },
"bridge_created": { "type": "BridgeCreated" },
"bridge_destroyed": { "type": "BridgeDestroyed" },
"channel_entered_bridge": { "type": "ChannelEnteredBridge" },
"channel_left_bridge": { "type": "ChannelLeftBridge" },
"channel_created": { "type": "ChannelCreated" },
"channel_destroyed": { "type": "ChannelDestroyed" },
"channel_snapshot": { "type": "ChannelSnapshot" },
"channel_state_change": { "type": "ChannelStateChange" },
"dtmf_received": { "type": "DtmfReceived" },
"channel_dtmf_received": { "type": "ChannelDtmfReceived" },
"channel_dialplan": { "type": "ChannelDialplan" },
"channel_caller_id": { "type": "ChannelCallerId" },
"channel_userevent": { "type": "ChannelUserevent" },
"channel_hangup_request": { "type": "ChannelHangupRequest" },
"channel_varset": { "type": "ChannelVarset" },
"stasis_end": { "type": "StasisEnd" },
"stasis_start": { "type": "StasisStart" }
}
@@ -70,48 +74,47 @@
"notes": "An application may only be subscribed to by a single WebSocket at a time. If multiple WebSockets attempt to subscribe to the same application, the newer WebSocket wins, and the older one receives this event.",
"properties": {
"application": {
"required": true,
"type": "string"
}
}
},
"BridgeCreated": {
"id": "BridgeCreated",
"description": "Notification that a bridge has been created.",
"ChannelCreated": {
"id": "ChannelCreated",
"description": "Notification that a channel has been created.",
"properties": {
"bridge": {
"type": "Bridge"
}
}
},
"BridgeDestroyed": {
"id": "BridgeDestroyed",
"description": "Notification that a bridge has been destroyed.",
"properties": {
"bridge": {
"type": "Bridge"
}
}
},
"ChannelEnteredBridge": {
"id": "ChannelEnteredBridge",
"description": "Notification that a channel has entered a bridge.",
"properties": {
"bridge": {
"type": "Bridge"
},
"channel": {
"required": true,
"type": "Channel"
}
}
},
"ChannelLeftBridge": {
"id": "ChannelLeftBridge",
"description": "Notification that a channel has left a bridge.",
"ChannelSnapshot": {
"id": "ChannelSnapshot",
"description": "Some part of channel state changed.",
"properties": {
"bridge": {
"type": "Bridge"
"channel": {
"required": true,
"type": "Channel"
}
}
},
"ChannelDestroyed": {
"id": "ChannelDestroyed",
"description": "Notification that a channel has been destroyed.",
"properties": {
"cause": {
"required": true,
"description": "Integer representation of the cause of the hangup",
"type": "integer"
},
"cause_txt": {
"required": true,
"description": "Text representation of the cause of the hangup",
"type": "string"
},
"channel": {
"required": true,
"type": "Channel"
}
}
@@ -120,31 +123,133 @@
"id": "ChannelStateChange",
"description": "Notification of a channel's state change.",
"properties": {
"channel_info": {
"channel": {
"required": true,
"type": "Channel"
}
}
},
"DtmfReceived": {
"id": "DtmfReceived",
"ChannelDtmfReceived": {
"id": "ChannelDtmfReceived",
"description": "DTMF received on a channel.",
"notes": "This event is sent when the DTMF ends. There is no notification about the start of DTMF",
"properties": {
"digit": {
"required": true,
"type": "string",
"description": "DTMF digit received (0-9, A-E, # or *)"
},
"channel": {
"required": true,
"type": "Channel",
"description": "The channel on which DTMF was received"
}
}
},
"ChannelDialplan": {
"id": "ChannelDialplan",
"description": "Channel changed location in the dialplan.",
"properties": {
"application": {
"required": true,
"type": "string",
"description": "The application that the channel is currently in."
},
"application_data": {
"required": true,
"type": "string",
"description": "The data that was passed to the application when it was invoked."
},
"channel": {
"required": true,
"type": "Channel",
"description": "The channel that changed dialplan location."
}
}
},
"ChannelCallerId": {
"id": "ChannelCallerId",
"description": "Channel changed Caller ID.",
"properties": {
"caller_presentation": {
"required": true,
"type": "integer",
"description": "The integer representation of the Caller Presentation value."
},
"caller_presentation_txt": {
"required": true,
"type": "string",
"description": "The text representation of the Caller Presentation value."
},
"channel": {
"required": true,
"type": "Channel",
"description": "The channel that changed Caller ID."
}
}
},
"ChannelUserevent": {
"id": "ChannelUserevent",
"description": "User-generated event with additional user-defined fields in the object.",
"properties": {
"eventname": {
"required": true,
"type": "string",
"description": "The name of the user event."
},
"channel": {
"required": true,
"type": "Channel",
"description": "The channel that signaled the user event."
}
}
},
"ChannelHangupRequest": {
"id": "ChannelHangupRequest",
"description": "A hangup was requested on the channel.",
"properties": {
"cause": {
"type": "integer",
"description": "Integer representation of the cause of the hangup."
},
"soft": {
"type": "boolean",
"description": "Whether the hangup request was a soft hangup request."
},
"channel": {
"required": true,
"type": "Channel",
"description": "The channel on which the hangup was requested."
}
}
},
"ChannelVarset": {
"id": "ChannelVarset",
"description": "Channel variable changed.",
"properties": {
"variable": {
"required": true,
"type": "string",
"description": "The variable that changed."
},
"value": {
"required": true,
"type": "string",
"description": "The new value of the variable."
},
"channel": {
"required": true,
"type": "Channel",
"description": "The channel on which the variable was set."
}
}
},
"StasisEnd": {
"id": "StasisEnd",
"description": "Notification that a channel has left a Stasis appliction.",
"properties": {
"channel_info": {
"channel": {
"required": true,
"type": "Channel"
}
}
@@ -154,10 +259,12 @@
"description": "Notification that a channel has entered a Stasis appliction.",
"properties": {
"args": {
"required": true,
"type": "List[string]",
"description": "Arguments to the application"
},
"channel_info": {
"channel": {
"required": true,
"type": "Channel"
}
}