mirror of
https://github.com/asterisk/asterisk.git
synced 2025-10-12 15:45:18 +00:00
ARI: REST over Websocket
This commit adds the ability to make ARI REST requests over the same websocket used to receive events. For full details on how to use the new capability, visit... https://docs.asterisk.org/Configuration/Interfaces/Asterisk-REST-Interface-ARI/ARI-REST-over-WebSocket/ Changes: * Added utilities to http.c: * ast_get_http_method_from_string(). * ast_http_parse_post_form(). * Added utilities to json.c: * ast_json_nvp_array_to_ast_variables(). * ast_variables_to_json_nvp_array(). * Added definitions for new events to carry REST responses. * Created res/ari/ari_websocket_requests.c to house the new request handlers. * Moved non-event specific code out of res/ari/resource_events.c into res/ari/ari_websockets.c * Refactored res/res_ari.c to move non-http code out of ast_ari_callback() (which is http specific) and into ast_ari_invoke() so it can be shared between both the http and websocket transports. UpgradeNote: This commit adds the ability to make ARI REST requests over the same websocket used to receive events. See https://docs.asterisk.org/Configuration/Interfaces/Asterisk-REST-Interface-ARI/ARI-REST-over-WebSocket/
This commit is contained in:
@@ -18,11 +18,19 @@
|
||||
|
||||
#include "asterisk.h"
|
||||
|
||||
#include "resource_events.h"
|
||||
#include "ari_websockets.h"
|
||||
#include "internal.h"
|
||||
#if defined(AST_DEVMODE)
|
||||
#include "ari_model_validators.h"
|
||||
#endif
|
||||
#include "asterisk/app.h"
|
||||
#include "asterisk/ari.h"
|
||||
#include "asterisk/astobj2.h"
|
||||
#include "asterisk/http_websocket.h"
|
||||
#include "asterisk/module.h"
|
||||
#include "asterisk/stasis_app.h"
|
||||
#include "internal.h"
|
||||
|
||||
|
||||
/*! \file
|
||||
*
|
||||
@@ -30,18 +38,22 @@
|
||||
* \author David M. Lee, II <dlee@digium.com>
|
||||
*/
|
||||
|
||||
struct ast_ari_websocket_session {
|
||||
struct ast_websocket *ws_session;
|
||||
int (*validator)(struct ast_json *);
|
||||
};
|
||||
/*! Number of buckets for the event session registry. Remember to keep it a prime number! */
|
||||
#define ARI_WS_SESSION_NUM_BUCKETS 23
|
||||
|
||||
static void websocket_session_dtor(void *obj)
|
||||
{
|
||||
struct ast_ari_websocket_session *session = obj;
|
||||
/*! Number of buckets for a websocket apps container. Remember to keep it a prime number! */
|
||||
#define APPS_NUM_BUCKETS 7
|
||||
|
||||
ast_websocket_unref(session->ws_session);
|
||||
session->ws_session = NULL;
|
||||
}
|
||||
/*! Initial size of a message queue. */
|
||||
#define MESSAGES_INIT_SIZE 23
|
||||
|
||||
|
||||
/*! \brief Local registry for created \ref event_session objects. */
|
||||
static struct ao2_container *ari_ws_session_registry;
|
||||
|
||||
struct ast_websocket_server *ast_ws_server;
|
||||
|
||||
#define MAX_VALS 128
|
||||
|
||||
/*!
|
||||
* \brief Validator that always succeeds.
|
||||
@@ -51,55 +63,99 @@ static int null_validator(struct ast_json *json)
|
||||
return 1;
|
||||
}
|
||||
|
||||
struct ast_ari_websocket_session *ast_ari_websocket_session_create(
|
||||
struct ast_websocket *ws_session, int (*validator)(struct ast_json *))
|
||||
{
|
||||
RAII_VAR(struct ast_ari_websocket_session *, session, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct ast_ari_conf *, config, ast_ari_config_get(), ao2_cleanup);
|
||||
#define VALIDATION_FAILED \
|
||||
"{" \
|
||||
" \"error\": \"InvalidMessage\"," \
|
||||
" \"message\": \"Message validation failed\"" \
|
||||
"}"
|
||||
|
||||
if (ws_session == NULL) {
|
||||
return NULL;
|
||||
static int ari_ws_session_write(
|
||||
struct ari_ws_session *ari_ws_session,
|
||||
struct ast_json *message)
|
||||
{
|
||||
RAII_VAR(char *, str, NULL, ast_json_free);
|
||||
|
||||
#ifdef AST_DEVMODE
|
||||
if (!ari_ws_session->validator(message)) {
|
||||
ast_log(LOG_ERROR, "Outgoing message failed validation\n");
|
||||
return ast_websocket_write_string(ari_ws_session->ast_ws_session, VALIDATION_FAILED);
|
||||
}
|
||||
#endif
|
||||
|
||||
str = ast_json_dump_string_format(message, ast_ari_json_format());
|
||||
|
||||
if (str == NULL) {
|
||||
ast_log(LOG_ERROR, "Failed to encode JSON object\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ast_websocket_write_string(ari_ws_session->ast_ws_session, str)) {
|
||||
ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
|
||||
ast_sockaddr_stringify(ast_websocket_remote_address(ari_ws_session->ast_ws_session)));
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Updates the websocket session.
|
||||
*
|
||||
* \details If the value of the \c ws_session is not \c NULL and there are messages in the
|
||||
* event session's \c message_queue, the messages are dispatched and removed from
|
||||
* the queue.
|
||||
*
|
||||
* \param ari_ws_session The ARI websocket session
|
||||
* \param ast_ws_session The Asterisk websocket session
|
||||
*/
|
||||
static int ari_ws_session_update(
|
||||
struct ari_ws_session *ari_ws_session,
|
||||
struct ast_websocket *ast_ws_session)
|
||||
{
|
||||
RAII_VAR(struct ast_ari_conf *, config, ast_ari_config_get(), ao2_cleanup);
|
||||
int i;
|
||||
|
||||
if (ast_ws_session == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (config == NULL || config->general == NULL) {
|
||||
return NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (validator == NULL) {
|
||||
validator = null_validator;
|
||||
}
|
||||
|
||||
if (ast_websocket_set_nonblock(ws_session) != 0) {
|
||||
if (ast_websocket_set_nonblock(ast_ws_session) != 0) {
|
||||
ast_log(LOG_ERROR,
|
||||
"ARI web socket failed to set nonblock; closing: %s\n",
|
||||
strerror(errno));
|
||||
return NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ast_websocket_set_timeout(ws_session, config->general->write_timeout)) {
|
||||
if (ast_websocket_set_timeout(ast_ws_session, config->general->write_timeout)) {
|
||||
ast_log(LOG_WARNING, "Failed to set write timeout %d on ARI web socket\n",
|
||||
config->general->write_timeout);
|
||||
}
|
||||
|
||||
session = ao2_alloc(sizeof(*session), websocket_session_dtor);
|
||||
if (!session) {
|
||||
return NULL;
|
||||
ao2_ref(ast_ws_session, +1);
|
||||
ari_ws_session->ast_ws_session = ast_ws_session;
|
||||
ao2_lock(ari_ws_session);
|
||||
for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->message_queue); i++) {
|
||||
struct ast_json *msg = AST_VECTOR_GET(&ari_ws_session->message_queue, i);
|
||||
ari_ws_session_write(ari_ws_session, msg);
|
||||
ast_json_unref(msg);
|
||||
}
|
||||
|
||||
ao2_ref(ws_session, +1);
|
||||
session->ws_session = ws_session;
|
||||
session->validator = validator;
|
||||
AST_VECTOR_RESET(&ari_ws_session->message_queue, AST_VECTOR_ELEM_CLEANUP_NOOP);
|
||||
ao2_unlock(ari_ws_session);
|
||||
|
||||
ao2_ref(session, +1);
|
||||
return session;
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct ast_json *ast_ari_websocket_session_read(
|
||||
struct ast_ari_websocket_session *session)
|
||||
static struct ast_json *ari_ws_session_read(
|
||||
struct ari_ws_session *ari_ws_session)
|
||||
{
|
||||
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
|
||||
|
||||
if (ast_websocket_fd(session->ws_session) < 0) {
|
||||
if (ast_websocket_fd(ari_ws_session->ast_ws_session) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -111,7 +167,7 @@ struct ast_json *ast_ari_websocket_session_read(
|
||||
int fragmented;
|
||||
|
||||
res = ast_wait_for_input(
|
||||
ast_websocket_fd(session->ws_session), -1);
|
||||
ast_websocket_fd(ari_ws_session->ast_ws_session), -1);
|
||||
|
||||
if (res <= 0) {
|
||||
ast_log(LOG_WARNING, "WebSocket poll error: %s\n",
|
||||
@@ -119,7 +175,7 @@ struct ast_json *ast_ari_websocket_session_read(
|
||||
return NULL;
|
||||
}
|
||||
|
||||
res = ast_websocket_read(session->ws_session, &payload,
|
||||
res = ast_websocket_read(ari_ws_session->ast_ws_session, &payload,
|
||||
&payload_len, &opcode, &fragmented);
|
||||
|
||||
if (res != 0) {
|
||||
@@ -135,8 +191,21 @@ struct ast_json *ast_ari_websocket_session_read(
|
||||
case AST_WEBSOCKET_OPCODE_TEXT:
|
||||
message = ast_json_load_buf(payload, payload_len, NULL);
|
||||
if (message == NULL) {
|
||||
struct ast_json *error = ast_json_pack(
|
||||
"{s:s, s:s, s:s, s:i, s:s, s:s }",
|
||||
"type", "RESTResponse",
|
||||
"transaction_id", "",
|
||||
"request_id", "",
|
||||
"status_code", 400,
|
||||
"reason_phrase", "Failed to parse request message JSON",
|
||||
"uri", ""
|
||||
);
|
||||
ari_websocket_send_event(ari_ws_session, ari_ws_session->app_name,
|
||||
error, 0);
|
||||
ast_json_unref(error);
|
||||
ast_log(LOG_WARNING,
|
||||
"WebSocket input failed to parse\n");
|
||||
|
||||
}
|
||||
|
||||
break;
|
||||
@@ -149,59 +218,534 @@ struct ast_json *ast_ari_websocket_session_read(
|
||||
return ast_json_ref(message);
|
||||
}
|
||||
|
||||
#define VALIDATION_FAILED \
|
||||
"{" \
|
||||
" \"error\": \"InvalidMessage\"," \
|
||||
" \"message\": \"Message validation failed\"" \
|
||||
"}"
|
||||
|
||||
int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session,
|
||||
struct ast_json *message)
|
||||
{
|
||||
RAII_VAR(char *, str, NULL, ast_json_free);
|
||||
|
||||
#ifdef AST_DEVMODE
|
||||
if (!session->validator(message)) {
|
||||
ast_log(LOG_ERROR, "Outgoing message failed validation\n");
|
||||
return ast_websocket_write_string(session->ws_session, VALIDATION_FAILED);
|
||||
}
|
||||
#endif
|
||||
|
||||
str = ast_json_dump_string_format(message, ast_ari_json_format());
|
||||
|
||||
if (str == NULL) {
|
||||
ast_log(LOG_ERROR, "Failed to encode JSON object\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ast_websocket_write_string(session->ws_session, str)) {
|
||||
ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
|
||||
ast_sockaddr_stringify(ast_ari_websocket_session_get_remote_addr(session)));
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct ast_sockaddr *ast_ari_websocket_session_get_remote_addr(
|
||||
struct ast_ari_websocket_session *session)
|
||||
{
|
||||
return ast_websocket_remote_address(session->ws_session);
|
||||
}
|
||||
|
||||
void ari_handle_websocket(struct ast_websocket_server *ws_server,
|
||||
void ari_handle_websocket(
|
||||
struct ast_tcptls_session_instance *ser, const char *uri,
|
||||
enum ast_http_method method, struct ast_variable *get_params,
|
||||
struct ast_variable *headers)
|
||||
{
|
||||
struct ast_http_uri fake_urih = {
|
||||
.data = ws_server,
|
||||
.data = ast_ws_server,
|
||||
};
|
||||
|
||||
ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params,
|
||||
headers);
|
||||
}
|
||||
|
||||
const char *ast_ari_websocket_session_id(
|
||||
const struct ast_ari_websocket_session *session)
|
||||
/*!
|
||||
* \brief Callback handler for Stasis application messages.
|
||||
*
|
||||
* \internal
|
||||
*
|
||||
* \param data Void pointer to the event session (\ref event_session).
|
||||
* \param app_name Name of the Stasis application that dispatched the message.
|
||||
* \param message The dispatched message.
|
||||
* \param debug_app Debug flag for the application.
|
||||
*/
|
||||
void ari_websocket_send_event(struct ari_ws_session *ari_ws_session,
|
||||
const char *app_name, struct ast_json *message, int debug_app)
|
||||
{
|
||||
return ast_websocket_session_id(session->ws_session);
|
||||
char *remote_addr = ast_sockaddr_stringify(
|
||||
ast_websocket_remote_address(ari_ws_session->ast_ws_session));
|
||||
const char *msg_type, *msg_application, *msg_timestamp, *msg_ast_id;
|
||||
SCOPE_ENTER(4, "%s: Dispatching message from Stasis app '%s'\n", remote_addr, app_name);
|
||||
|
||||
ast_assert(ari_ws_session != NULL);
|
||||
|
||||
ao2_lock(ari_ws_session);
|
||||
|
||||
msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
|
||||
msg_application = S_OR(
|
||||
ast_json_string_get(ast_json_object_get(message, "application")), "");
|
||||
|
||||
/* If we've been replaced, remove the application from our local
|
||||
websocket_apps container */
|
||||
if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
|
||||
strcmp(msg_application, app_name) == 0) {
|
||||
ao2_find(ari_ws_session->websocket_apps, msg_application,
|
||||
OBJ_UNLINK | OBJ_NODATA);
|
||||
}
|
||||
|
||||
msg_timestamp = S_OR(
|
||||
ast_json_string_get(ast_json_object_get(message, "timestamp")), "");
|
||||
if (ast_strlen_zero(msg_timestamp)) {
|
||||
if (ast_json_object_set(message, "timestamp", ast_json_timeval(ast_tvnow(), NULL))) {
|
||||
ao2_unlock(ari_ws_session);
|
||||
SCOPE_EXIT_LOG_RTN(LOG_WARNING,
|
||||
"%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
|
||||
remote_addr, msg_type, msg_application);
|
||||
}
|
||||
}
|
||||
|
||||
msg_ast_id = S_OR(
|
||||
ast_json_string_get(ast_json_object_get(message, "asterisk_id")), "");
|
||||
if (ast_strlen_zero(msg_ast_id)) {
|
||||
char eid[20];
|
||||
|
||||
if (ast_json_object_set(message, "asterisk_id",
|
||||
ast_json_string_create(ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
|
||||
ao2_unlock(ari_ws_session);
|
||||
SCOPE_EXIT_LOG_RTN(LOG_WARNING,
|
||||
"%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
|
||||
remote_addr, msg_type, msg_application);
|
||||
}
|
||||
}
|
||||
|
||||
/* Now, we need to determine our state to see how we will handle the message */
|
||||
if (ast_json_object_set(message, "application", ast_json_string_create(app_name))) {
|
||||
ao2_unlock(ari_ws_session);
|
||||
SCOPE_EXIT_LOG_RTN(LOG_WARNING,
|
||||
"%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
|
||||
remote_addr, msg_type, msg_application);
|
||||
}
|
||||
|
||||
if (!ari_ws_session) {
|
||||
/* If the websocket is NULL, the message goes to the queue */
|
||||
if (!AST_VECTOR_APPEND(&ari_ws_session->message_queue, message)) {
|
||||
ast_json_ref(message);
|
||||
}
|
||||
ast_log(LOG_WARNING,
|
||||
"%s: Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
|
||||
remote_addr,
|
||||
msg_type,
|
||||
msg_application);
|
||||
} else if (stasis_app_event_allowed(app_name, message)) {
|
||||
|
||||
if (TRACE_ATLEAST(4) || debug_app) {
|
||||
char *str = ast_json_dump_string_format(message, AST_JSON_PRETTY);
|
||||
|
||||
ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
|
||||
remote_addr,
|
||||
str);
|
||||
ast_json_free(str);
|
||||
}
|
||||
|
||||
ari_ws_session_write(ari_ws_session, message);
|
||||
}
|
||||
|
||||
ao2_unlock(ari_ws_session);
|
||||
SCOPE_EXIT("%s: Dispatched '%s' message from Stasis app '%s'\n",
|
||||
remote_addr, msg_type, app_name);
|
||||
}
|
||||
|
||||
static void stasis_app_message_handler(void *data, const char *app_name,
|
||||
struct ast_json *message)
|
||||
{
|
||||
int debug_app = stasis_app_get_debug_by_name(app_name);
|
||||
struct ari_ws_session *ari_ws_session = data;
|
||||
ast_assert(ari_ws_session != NULL);
|
||||
ari_websocket_send_event(ari_ws_session, app_name, message, debug_app);
|
||||
}
|
||||
|
||||
static int parse_app_args(struct ast_variable *get_params,
|
||||
struct ast_ari_response * response,
|
||||
struct ast_ari_events_event_websocket_args *args)
|
||||
{
|
||||
struct ast_variable *i;
|
||||
RAII_VAR(char *, app_parse, NULL, ast_free);
|
||||
|
||||
for (i = get_params; i; i = i->next) {
|
||||
if (strcmp(i->name, "app") == 0) {
|
||||
/* Parse comma separated list */
|
||||
char *vals[MAX_VALS];
|
||||
size_t j;
|
||||
|
||||
app_parse = ast_strdup(i->value);
|
||||
if (!app_parse) {
|
||||
ast_ari_response_alloc_failed(response);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (strlen(app_parse) == 0) {
|
||||
/* ast_app_separate_args can't handle "" */
|
||||
args->app_count = 1;
|
||||
vals[0] = app_parse;
|
||||
} else {
|
||||
args->app_count = ast_app_separate_args(
|
||||
app_parse, ',', vals,
|
||||
ARRAY_LEN(vals));
|
||||
}
|
||||
|
||||
if (args->app_count == 0) {
|
||||
ast_ari_response_alloc_failed(response);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (args->app_count >= MAX_VALS) {
|
||||
ast_ari_response_error(response, 400,
|
||||
"Bad Request",
|
||||
"Too many values for app");
|
||||
return -1;
|
||||
}
|
||||
|
||||
args->app = ast_malloc(sizeof(*args->app) * args->app_count);
|
||||
if (!args->app) {
|
||||
ast_ari_response_alloc_failed(response);
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (j = 0; j < args->app_count; ++j) {
|
||||
args->app[j] = (vals[j]);
|
||||
}
|
||||
} else if (strcmp(i->name, "subscribeAll") == 0) {
|
||||
args->subscribe_all = ast_true(i->value);
|
||||
}
|
||||
}
|
||||
|
||||
args->app_parse = app_parse;
|
||||
app_parse = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Websocket session cleanup is a bit complicated because it can be
|
||||
* in different states, it may or may not be in the registry container,
|
||||
* and stasis may be sending asynchronous events to it and some
|
||||
* stages of cleanup need to lock it.
|
||||
*
|
||||
* That's why there are 3 different cleanup functions.
|
||||
*/
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Reset the ari_ws_session without destroying it.
|
||||
* It can't be reused and will be cleaned up by the caller.
|
||||
*/
|
||||
static void ari_ws_session_reset(struct ari_ws_session *ari_ws_session)
|
||||
{
|
||||
struct ao2_iterator i;
|
||||
char *app;
|
||||
int j;
|
||||
SCOPED_AO2LOCK(lock, ari_ws_session);
|
||||
|
||||
/* Clean up the websocket_apps container */
|
||||
if (ari_ws_session->websocket_apps) {
|
||||
i = ao2_iterator_init(ari_ws_session->websocket_apps, 0);
|
||||
while ((app = ao2_iterator_next(&i))) {
|
||||
stasis_app_unregister(app);
|
||||
ao2_cleanup(app);
|
||||
}
|
||||
ao2_iterator_destroy(&i);
|
||||
ao2_cleanup(ari_ws_session->websocket_apps);
|
||||
ari_ws_session->websocket_apps = NULL;
|
||||
}
|
||||
|
||||
/* Clean up the message_queue container */
|
||||
for (j = 0; j < AST_VECTOR_SIZE(&ari_ws_session->message_queue); j++) {
|
||||
struct ast_json *msg = AST_VECTOR_GET(&ari_ws_session->message_queue, j);
|
||||
ast_json_unref(msg);
|
||||
}
|
||||
AST_VECTOR_FREE(&ari_ws_session->message_queue);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief RAII_VAR and container ari_ws_session cleanup function.
|
||||
* This unlinks the ari_ws_session from the registry and cleans up the
|
||||
* decrements the reference count.
|
||||
*/
|
||||
static void ari_ws_session_cleanup(struct ari_ws_session *ari_ws_session)
|
||||
{
|
||||
if (!ari_ws_session) {
|
||||
return;
|
||||
}
|
||||
|
||||
ari_ws_session_reset(ari_ws_session);
|
||||
if (ari_ws_session_registry) {
|
||||
ao2_unlink(ari_ws_session_registry, ari_ws_session);
|
||||
}
|
||||
ao2_ref(ari_ws_session, -1);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief The ao2 destructor.
|
||||
* This cleans up the reference to the parent ast_websocket.
|
||||
*/
|
||||
static void ari_ws_session_dtor(void *obj)
|
||||
{
|
||||
struct ari_ws_session *ari_ws_session = obj;
|
||||
|
||||
ast_free(ari_ws_session->app_name);
|
||||
if (!ari_ws_session->ast_ws_session) {
|
||||
return;
|
||||
}
|
||||
ast_websocket_unref(ari_ws_session->ast_ws_session);
|
||||
ari_ws_session->ast_ws_session = NULL;
|
||||
}
|
||||
|
||||
static int ari_ws_session_create(
|
||||
int (*validator)(struct ast_json *),
|
||||
struct ast_tcptls_session_instance *ser,
|
||||
struct ast_ari_events_event_websocket_args *args,
|
||||
const char *session_id)
|
||||
{
|
||||
RAII_VAR(struct ari_ws_session *, ari_ws_session, NULL, ao2_cleanup);
|
||||
int (* register_handler)(const char *, stasis_app_cb handler, void *data);
|
||||
size_t size, i;
|
||||
|
||||
if (validator == NULL) {
|
||||
validator = null_validator;
|
||||
}
|
||||
|
||||
size = sizeof(*ari_ws_session) + strlen(session_id) + 1;
|
||||
|
||||
ari_ws_session = ao2_alloc(size, ari_ws_session_dtor);
|
||||
if (!ari_ws_session) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ari_ws_session->app_name = ast_strdup(args->app_parse);
|
||||
if (!ari_ws_session->app_name) {
|
||||
ast_http_error(ser, 500, "Internal Server Error",
|
||||
"Allocation failed");
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(ari_ws_session->session_id, session_id); /* Safe */
|
||||
|
||||
/* Instantiate the hash table for Stasis apps */
|
||||
ari_ws_session->websocket_apps =
|
||||
ast_str_container_alloc(APPS_NUM_BUCKETS);
|
||||
if (!ari_ws_session->websocket_apps) {
|
||||
ast_http_error(ser, 500, "Internal Server Error",
|
||||
"Allocation failed");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Instantiate the message queue */
|
||||
if (AST_VECTOR_INIT(&ari_ws_session->message_queue, MESSAGES_INIT_SIZE)) {
|
||||
ast_http_error(ser, 500, "Internal Server Error",
|
||||
"Allocation failed");
|
||||
ao2_cleanup(ari_ws_session->websocket_apps);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Register the apps with Stasis */
|
||||
if (args->subscribe_all) {
|
||||
register_handler = &stasis_app_register_all;
|
||||
} else {
|
||||
register_handler = &stasis_app_register;
|
||||
}
|
||||
|
||||
for (i = 0; i < args->app_count; ++i) {
|
||||
const char *app = args->app[i];
|
||||
|
||||
if (ast_strlen_zero(app)) {
|
||||
ast_http_error(ser, 400, "Bad Request",
|
||||
"Invalid application provided in param [app].");
|
||||
ari_ws_session_reset(ari_ws_session);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ast_str_container_add(ari_ws_session->websocket_apps, app)) {
|
||||
ast_http_error(ser, 500, "Internal Server Error",
|
||||
"Allocation failed");
|
||||
ari_ws_session_reset(ari_ws_session);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (register_handler(app, stasis_app_message_handler, ari_ws_session)) {
|
||||
ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
|
||||
ast_http_error(ser, 500, "Internal Server Error",
|
||||
"Stasis registration failed");
|
||||
ari_ws_session_reset(ari_ws_session);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ari_ws_session->validator = validator;
|
||||
|
||||
/*
|
||||
* Add the event session to the session registry.
|
||||
* When this functions returns, the registry will have
|
||||
* the only reference to the session.
|
||||
*/
|
||||
if (!ao2_link(ari_ws_session_registry, ari_ws_session)) {
|
||||
ast_http_error(ser, 500, "Internal Server Error",
|
||||
"Allocation failed");
|
||||
ari_ws_session_reset(ari_ws_session);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief This function gets called before the upgrade process is completed.
|
||||
* HTTP is still in effect.
|
||||
*/
|
||||
static int websocket_attempted_cb(struct ast_tcptls_session_instance *ser,
|
||||
struct ast_variable *get_params, struct ast_variable *headers,
|
||||
const char *session_id)
|
||||
{
|
||||
struct ast_ari_events_event_websocket_args args = {};
|
||||
int res = 0;
|
||||
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
|
||||
char *remote_addr = ast_sockaddr_stringify(&ser->remote_address);
|
||||
|
||||
response = ast_calloc(1, sizeof(*response));
|
||||
if (!response) {
|
||||
ast_log(LOG_ERROR, "Failed to create response.\n");
|
||||
ast_http_error(ser, 500, "Server Error", "Memory allocation error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
res = parse_app_args(get_params, response, &args);
|
||||
if (res != 0) {
|
||||
/* Param parsing failure */
|
||||
RAII_VAR(char *, msg, NULL, ast_json_free);
|
||||
if (response->message) {
|
||||
msg = ast_json_dump_string(response->message);
|
||||
} else {
|
||||
ast_log(LOG_ERROR, "Missing response message\n");
|
||||
}
|
||||
|
||||
if (msg) {
|
||||
ast_http_error(ser, response->response_code, response->response_text, msg);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (args.app_count == 0) {
|
||||
ast_http_error(ser, 400, "Bad Request",
|
||||
"HTTP request is missing param: [app]");
|
||||
return -1;
|
||||
}
|
||||
|
||||
#if defined(AST_DEVMODE)
|
||||
res = ari_ws_session_create(ast_ari_validate_message_fn(),
|
||||
ser, &args, session_id);
|
||||
#else
|
||||
res = ari_ws_session_create(NULL, ser, &args, session_id);
|
||||
#endif
|
||||
if (res != 0) {
|
||||
ast_log(LOG_ERROR,
|
||||
"%s: Failed to create ARI ari_session\n", remote_addr);
|
||||
}
|
||||
|
||||
ast_free(args.app_parse);
|
||||
ast_free(args.app);
|
||||
return res;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief This function gets called after the upgrade process is completed.
|
||||
* The websocket is now in effect.
|
||||
*/
|
||||
static void websocket_established_cb(struct ast_websocket *ast_ws_session,
|
||||
struct ast_variable *get_params, struct ast_variable *upgrade_headers)
|
||||
{
|
||||
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
|
||||
/*
|
||||
* ast_ws_session is passed in with it's refcount bumped so
|
||||
* we need to unref it when we're done. The refcount will
|
||||
* be bumped again when we add it to the ari_ws_session.
|
||||
*/
|
||||
RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
|
||||
RAII_VAR(struct ari_ws_session *, ari_ws_session, NULL, ari_ws_session_cleanup);
|
||||
struct ast_json *msg;
|
||||
struct ast_variable *v;
|
||||
char *remote_addr = ast_sockaddr_stringify(
|
||||
ast_websocket_remote_address(ast_ws_session));
|
||||
const char *session_id = ast_websocket_session_id(ast_ws_session);
|
||||
|
||||
SCOPE_ENTER(2, "%s: WebSocket established\n", remote_addr);
|
||||
|
||||
if (TRACE_ATLEAST(2)) {
|
||||
ast_trace(2, "%s: Websocket Upgrade Headers:\n", remote_addr);
|
||||
for (v = upgrade_headers; v; v = v->next) {
|
||||
ast_trace(3, "--> %s: %s\n", v->name, v->value);
|
||||
}
|
||||
}
|
||||
|
||||
response = ast_calloc(1, sizeof(*response));
|
||||
if (!response) {
|
||||
SCOPE_EXIT_LOG_RTN(LOG_ERROR,
|
||||
"%s: Failed to create response\n", remote_addr);
|
||||
}
|
||||
|
||||
/* Find the event_session and update its websocket */
|
||||
ari_ws_session = ao2_find(ari_ws_session_registry, session_id, OBJ_SEARCH_KEY);
|
||||
if (ari_ws_session) {
|
||||
ao2_unlink(ari_ws_session_registry, ari_ws_session);
|
||||
ari_ws_session_update(ari_ws_session, ast_ws_session);
|
||||
} else {
|
||||
SCOPE_EXIT_LOG_RTN(LOG_ERROR,
|
||||
"%s: Failed to locate an event session for the websocket session\n",
|
||||
remote_addr);
|
||||
}
|
||||
|
||||
ast_trace(-1, "%s: Waiting for messages\n", remote_addr);
|
||||
while ((msg = ari_ws_session_read(ari_ws_session))) {
|
||||
ari_websocket_process_request(ari_ws_session, remote_addr,
|
||||
upgrade_headers, ari_ws_session->app_name, msg);
|
||||
ast_json_unref(msg);
|
||||
}
|
||||
|
||||
SCOPE_EXIT("%s: Websocket closed\n", remote_addr);
|
||||
}
|
||||
|
||||
static int ari_ws_session_shutdown_cb(void *ari_ws_session, void *arg, int flags)
|
||||
{
|
||||
ari_ws_session_cleanup(ari_ws_session);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void ari_ws_session_registry_dtor(void)
|
||||
{
|
||||
ao2_callback(ari_ws_session_registry, OBJ_MULTIPLE | OBJ_NODATA,
|
||||
ari_ws_session_shutdown_cb, NULL);
|
||||
|
||||
ao2_cleanup(ari_ws_session_registry);
|
||||
ari_ws_session_registry = NULL;
|
||||
}
|
||||
|
||||
int ari_websocket_unload_module(void)
|
||||
{
|
||||
ari_ws_session_registry_dtor();
|
||||
ao2_cleanup(ast_ws_server);
|
||||
ast_ws_server = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
AO2_STRING_FIELD_CMP_FN(ari_ws_session, session_id);
|
||||
AO2_STRING_FIELD_HASH_FN(ari_ws_session, session_id);
|
||||
|
||||
int ari_websocket_load_module(void)
|
||||
{
|
||||
int res = 0;
|
||||
struct ast_websocket_protocol *protocol;
|
||||
|
||||
ari_ws_session_registry = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
|
||||
ARI_WS_SESSION_NUM_BUCKETS, ari_ws_session_hash_fn,
|
||||
NULL, ari_ws_session_cmp_fn);
|
||||
if (!ari_ws_session_registry) {
|
||||
ast_log(LOG_WARNING,
|
||||
"Failed to allocate the local registry for websocket applications\n");
|
||||
return AST_MODULE_LOAD_DECLINE;
|
||||
}
|
||||
|
||||
ast_ws_server = ast_websocket_server_create();
|
||||
if (!ast_ws_server) {
|
||||
ari_ws_session_registry_dtor();
|
||||
return AST_MODULE_LOAD_DECLINE;
|
||||
}
|
||||
|
||||
protocol = ast_websocket_sub_protocol_alloc("ari");
|
||||
if (!protocol) {
|
||||
ao2_ref(ast_ws_server, -1);
|
||||
ast_ws_server = NULL;
|
||||
ari_ws_session_registry_dtor();
|
||||
return AST_MODULE_LOAD_DECLINE;
|
||||
}
|
||||
protocol->session_attempted = websocket_attempted_cb;
|
||||
protocol->session_established = websocket_established_cb;
|
||||
res = ast_websocket_server_add_protocol2(ast_ws_server, protocol);
|
||||
|
||||
return res == 0 ? AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user