Files
asterisk/res/res_stasis_websocket.c
David M. Lee a2a53cc306 Stasis application WebSocket support
This is the API that binds the Stasis dialplan application to external
Stasis applications. It also adds the beginnings of WebSocket
application support.

This module registers a dialplan function named Stasis, which is used
to put a channel into the named Stasis app. As a channel enters and
leaves the Stasis diaplan application, the Stasis app receives a
'stasis-start' and 'stasis-end' events.

Stasis apps register themselves using the stasis_app_register and
stasis_app_unregister functions. Messages are sent to an application
using stasis_app_send.

Finally, Stasis apps control channels through the use of the
stasis_app_control object, and the family of stasis_app_control_*
functions.

Other changes along for the ride are:
 * An ast_frame_dtor function that's RAII_VAR safe
 * Some common JSON encoders for name/number, timeval, and
   context/extension/priority

Review: https://reviewboard.asterisk.org/r/2361/


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384879 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-04-08 13:27:45 +00:00

327 lines
7.7 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2012 - 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief HTTP binding for the Stasis API
*
* \author David M. Lee, II <dlee@digium.com>
*/
/*** MODULEINFO
<depend type="module">app_stasis</depend>
<depend type="module">res_http_websocket</depend>
<support_level>core</support_level>
***/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/app_stasis.h"
#include "asterisk/astobj2.h"
#include "asterisk/http_websocket.h"
#include "asterisk/json.h"
#include "asterisk/module.h"
#include "asterisk/strings.h"
#include "asterisk/utils.h"
/*! WebSocket protocol for Stasis */
static const char * const ws_protocol = "stasis";
/*! Message to send when out of memory */
static struct ast_json *oom_json;
/*! Number of buckets for the Stasis application hash table. Remember to keep it
* a prime number!
*/
#define APPS_NUM_BUCKETS 7
struct websocket_app {
char *name;
};
/*!
* \internal
* \brief Helper to write a JSON object to a WebSocket.
* \param session WebSocket session.
* \param message JSON message.
* \return 0 on success.
* \return -1 on error.
*/
static int websocket_write_json(struct ast_websocket *session,
struct ast_json *message)
{
RAII_VAR(char *, str, ast_json_dump_string(message), ast_free);
if (str == NULL) {
ast_log(LOG_ERROR, "Failed to encode JSON object\n");
return -1;
}
return ast_websocket_write(session, AST_WEBSOCKET_OPCODE_TEXT, str,
strlen(str));
}
/*! Hash function for websocket_app */
static int hash_app(const void *obj, const int flags)
{
const struct websocket_app *app = obj;
const char *name = flags & OBJ_KEY ? obj : app->name;
return ast_str_hash(name);
}
/*! Comparison function for websocket_app */
static int compare_app(void *lhs, void *rhs, int flags)
{
const struct websocket_app *lhs_app = lhs;
const struct websocket_app *rhs_app = rhs;
const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
if (strcmp(lhs_app->name, rhs_name) == 0) {
return CMP_MATCH;
} else {
return 0;
}
}
static void app_dtor(void *obj)
{
struct websocket_app *app = obj;
ast_free(app->name);
}
struct stasis_ws_session_info {
struct ast_websocket *ws_session;
struct ao2_container *websocket_apps;
};
static void session_dtor(void *obj)
{
struct stasis_ws_session_info *session = obj;
/* session_shutdown should have been called before */
ast_assert(session->ws_session == NULL);
ast_assert(session->websocket_apps == NULL);
}
static struct stasis_ws_session_info *session_create(
struct ast_websocket *ws_session)
{
RAII_VAR(struct stasis_ws_session_info *, session, NULL, ao2_cleanup);
session = ao2_alloc(sizeof(*session), session_dtor);
session->ws_session = ws_session;
session->websocket_apps =
ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app);
if (!session->websocket_apps) {
return NULL;
}
ao2_ref(session, +1);
return session;
}
/*!
* \brief Explicitly shutdown a session.
*
* An explicit shutdown is necessary, since stasis-app has a reference to this
* session. We also need to be sure to null out the \c ws_session field, since
* the websocket is about to go away.
*
* \param session Session info struct.
*/
static void session_shutdown(struct stasis_ws_session_info *session)
{
struct ao2_iterator i;
struct websocket_app *app;
SCOPED_AO2LOCK(lock, session);
i = ao2_iterator_init(session->websocket_apps, 0);
while ((app = ao2_iterator_next(&i))) {
stasis_app_unregister(app->name);
ao2_cleanup(app);
}
ao2_iterator_destroy(&i);
ao2_cleanup(session->websocket_apps);
session->websocket_apps = NULL;
session->ws_session = NULL;
}
/*!
* \brief Callback handler for Stasis application messages.
*/
static void app_handler(void *data, const char *app_name,
struct ast_json *message)
{
struct stasis_ws_session_info *session = data;
int res;
res = ast_json_object_set(message, "application",
ast_json_string_create(app_name));
if(res != 0) {
return;
}
ao2_lock(session);
if (session->ws_session) {
websocket_write_json(session->ws_session, message);
}
ao2_unlock(session);
}
/*!
* \brief Register for all of the apps given.
* \param session Session info struct.
* \param app_list Comma seperated list of app names to register.
*/
static int session_register_apps(struct stasis_ws_session_info *session,
const char *app_list)
{
RAII_VAR(char *, to_free, NULL, ast_free);
char *apps, *app_name;
SCOPED_AO2LOCK(lock, session);
ast_assert(session->ws_session != NULL);
ast_assert(session->websocket_apps != NULL);
to_free = apps = ast_strdup(app_list);
if (!apps) {
websocket_write_json(session->ws_session, oom_json);
return -1;
}
while ((app_name = strsep(&apps, ","))) {
RAII_VAR(struct websocket_app *, app, NULL, ao2_cleanup);
app = ao2_alloc(sizeof(*app), app_dtor);
if (!app) {
websocket_write_json(session->ws_session, oom_json);
return -1;
}
app->name = ast_strdup(app_name);
ao2_link(session->websocket_apps, app);
stasis_app_register(app_name, app_handler, session);
}
return 0;
}
static void websocket_callback(struct ast_websocket *ws_session,
struct ast_variable *parameters,
struct ast_variable *headers)
{
RAII_VAR(struct stasis_ws_session_info *, stasis_session, NULL, ao2_cleanup);
struct ast_variable *param = NULL;
int res;
ast_debug(3, "Stasis web socket connection\n");
if (ast_websocket_set_nonblock(ws_session) != 0) {
ast_log(LOG_ERROR,
"Stasis web socket failed to set nonblock; closing\n");
goto end;
}
stasis_session = session_create(ws_session);
if (!stasis_session) {
websocket_write_json(ws_session, oom_json);
goto end;
}
for (param = parameters; param; param = param->next) {
if (strcmp(param->name, "app") == 0) {
int ret = session_register_apps(
stasis_session, param->value);
if (ret != 0) {
goto end;
}
}
}
if (ao2_container_count(stasis_session->websocket_apps) == 0) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
msg = ast_json_pack("{s: s, s: [s]}",
"error", "MissingParams",
"params", "app");
if (msg) {
websocket_write_json(ws_session, msg);
}
goto end;
}
while ((res = ast_wait_for_input(ast_websocket_fd(ws_session), -1)) > 0) {
char *payload;
uint64_t payload_len;
enum ast_websocket_opcode opcode;
int fragmented;
int read = ast_websocket_read(ws_session, &payload, &payload_len,
&opcode, &fragmented);
if (read) {
ast_log(LOG_ERROR,
"Stasis WebSocket read error; closing\n");
break;
}
if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
break;
}
}
end:
session_shutdown(stasis_session);
ast_websocket_unref(ws_session);
}
static int load_module(void)
{
int r = 0;
oom_json = ast_json_pack("{s: s}",
"error", "OutOfMemory");
if (!oom_json) {
/* ironic */
return AST_MODULE_LOAD_FAILURE;
}
r |= ast_websocket_add_protocol(ws_protocol, websocket_callback);
return r;
}
static int unload_module(void)
{
int r = 0;
ast_json_unref(oom_json);
oom_json = NULL;
r |= ast_websocket_remove_protocol(ws_protocol, websocket_callback);
return r;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis HTTP bindings",
.load = load_module,
.unload = unload_module,
.nonoptreq = "app_stasis,res_http_websocket"
);