res_aeap & res_speech_aeap: Add Asterisk External Application Protocol

Add framework to connect to, and read and write protocol based
messages from and to an external application using an Asterisk
External Application Protocol (AEAP). This has been divided into
several abstractions:

 1. transport - base communication layer (currently websocket only)
 2. message - AEAP description and data (currently JSON only)
 3. transaction - links/binds requests and responses
 4. aeap - transport, message, and transaction handler/manager

This patch also adds an AEAP implementation for speech to text.
Existing speech API callbacks for speech to text have been completed
making it possible for Asterisk to connect to a configured external
translator service and provide audio for STT. Results can also be
received from the external translator, and made available as speech
results in Asterisk.

Unit tests have also been created that test the AEAP framework, and
also the speech to text implementation.

ASTERISK-29726 #close

Change-Id: Iaa4b259f84aa63501e5fd2a6fb107f900b4d4ed2
This commit is contained in:
Kevin Harwell
2021-06-18 12:54:10 -05:00
committed by Friendly Automation
parent 53a3af6321
commit 272bac70dd
25 changed files with 4831 additions and 57 deletions

View File

@@ -69,6 +69,7 @@ $(call MOD_ADD_C,res_ari,ari/cli.c ari/config.c ari/ari_websockets.c)
$(call MOD_ADD_C,res_ari_model,ari/ari_model_validators.c)
$(call MOD_ADD_C,res_stasis_recording,stasis_recording/stored.c)
$(call MOD_ADD_C,res_stir_shaken,$(wildcard res_stir_shaken/*.c))
$(call MOD_ADD_C,res_aeap,$(wildcard res_aeap/*.c))
res_parking.o: _ASTCFLAGS+=$(AST_NO_FORMAT_TRUNCATION)
snmp/agent.o: _ASTCFLAGS+=-fPIC

View File

@@ -17,29 +17,37 @@
*/
/*** MODULEINFO
<depend>res_http_websocket</depend>
<support_level>core</support_level>
***/
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
#include "asterisk/sorcery.h"
#include "asterisk/cli.h"
#include "asterisk/format.h"
#include "asterisk/format_cap.h"
#include "asterisk/res_aeap.h"
#include "res_aeap/general.h"
/*** DOCUMENTATION
<configInfo name="res_aeap" language="en_US">
<synopsis>Asterisk External Application Protocol (AEAP) module for Asterisk</synopsis>
<configFile name="aeap.conf">
<configObject name="server">
<synopsis>AEAP server options</synopsis>
<configObject name="client">
<synopsis>AEAP client options</synopsis>
<configOption name="type">
<synopsis>Must be of type 'server'.</synopsis>
<synopsis>Must be of type 'client'.</synopsis>
</configOption>
<configOption name="server_url">
<configOption name="url">
<synopsis>The URL of the server to connect to.</synopsis>
</configOption>
<configOption name="protocol">
<synopsis>The application protocol.</synopsis>
</configOption>
<configOption name="codecs">
<synopsis>Optional media codec(s)</synopsis>
<description><para>
@@ -56,30 +64,36 @@
/* Asterisk External Application Protocol sorcery object */
static struct ast_sorcery *aeap_sorcery;
struct aeap_server
struct ast_sorcery *ast_aeap_sorcery(void) {
return aeap_sorcery;
}
struct ast_aeap_client_config
{
SORCERY_OBJECT(details);
AST_DECLARE_STRING_FIELDS(
/*! The URL of the server to connect to */
AST_STRING_FIELD(server_url);
AST_STRING_FIELD(url);
/*! The application protocol */
AST_STRING_FIELD(protocol);
);
/*! An optional list of codecs that will be used if provided */
struct ast_format_cap *codecs;
};
static void aeap_server_destructor(void *obj)
static void client_config_destructor(void *obj)
{
struct aeap_server *cfg = obj;
struct ast_aeap_client_config *cfg = obj;
ast_string_field_free_memory(cfg);
ao2_cleanup(cfg->codecs);
}
static void *aeap_server_alloc(const char *name)
static void *client_config_alloc(const char *name)
{
struct aeap_server *cfg;
struct ast_aeap_client_config *cfg;
cfg = ast_sorcery_generic_alloc(sizeof(*cfg), aeap_server_destructor);
cfg = ast_sorcery_generic_alloc(sizeof(*cfg), client_config_destructor);
if (!cfg) {
return NULL;
}
@@ -97,32 +111,52 @@ static void *aeap_server_alloc(const char *name)
return cfg;
}
static int aeap_server_apply(const struct ast_sorcery *sorcery, void *obj)
static int client_config_apply(const struct ast_sorcery *sorcery, void *obj)
{
struct aeap_server *cfg = obj;
struct ast_aeap_client_config *cfg = obj;
if (ast_strlen_zero(cfg->server_url)) {
ast_log(LOG_ERROR, "AEAP - Server URL must be present for server '%s'\n", ast_sorcery_object_get_id(cfg));
if (ast_strlen_zero(cfg->url)) {
ast_log(LOG_ERROR, "AEAP - URL must be present for '%s'\n", ast_sorcery_object_get_id(cfg));
return -1;
}
if (!ast_begins_with(cfg->server_url, "ws")) {
ast_log(LOG_ERROR, "AEAP - Server URL must be ws or wss for server '%s'\n", ast_sorcery_object_get_id(cfg));
if (!ast_begins_with(cfg->url, "ws")) {
ast_log(LOG_ERROR, "AEAP - URL must be ws or wss for '%s'\n", ast_sorcery_object_get_id(cfg));
return -1;
}
return 0;
}
static struct aeap_server *aeap_server_get(const char *id)
const struct ast_format_cap *ast_aeap_client_config_codecs(const struct ast_aeap_client_config *cfg)
{
return ast_sorcery_retrieve_by_id(aeap_sorcery, "server", id);
return cfg->codecs;
}
static struct ao2_container *aeap_server_get_all(void)
int ast_aeap_client_config_has_protocol(const struct ast_aeap_client_config *cfg,
const char *protocol)
{
return ast_sorcery_retrieve_by_fields(aeap_sorcery, "server",
AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
return !strcmp(protocol, cfg->protocol);
}
struct ao2_container *ast_aeap_client_configs_get(const char *protocol)
{
struct ao2_container *container;
struct ast_variable *var;
var = protocol ? ast_variable_new("protocol ==", protocol, "") : NULL;
container = ast_sorcery_retrieve_by_fields(aeap_sorcery,
AEAP_CONFIG_CLIENT, AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, var);
ast_variables_destroy(var);
return container;
}
static struct ast_aeap_client_config *client_config_get(const char *id)
{
return ast_sorcery_retrieve_by_id(aeap_sorcery, AEAP_CONFIG_CLIENT, id);
}
static char *aeap_tab_complete_name(const char *word, struct ao2_container *container)
@@ -145,6 +179,8 @@ static char *aeap_tab_complete_name(const char *word, struct ao2_container *cont
}
ao2_iterator_destroy(&it);
ao2_ref(container, -1);
return NULL;
}
@@ -159,8 +195,7 @@ static int aeap_cli_show(void *obj, void *arg, int flags)
return 0;
}
options = ast_variable_list_sort(ast_sorcery_objectset_create2(
aeap_sorcery, obj, AST_HANDLER_ONLY_STRING));
options = ast_variable_list_sort(ast_sorcery_objectset_create(aeap_sorcery, obj));
if (!options) {
return 0;
}
@@ -179,20 +214,20 @@ static int aeap_cli_show(void *obj, void *arg, int flags)
return 0;
}
static char *aeap_server_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char *client_config_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct aeap_server *cfg;
struct ast_aeap_client_config *cfg;
switch(cmd) {
case CLI_INIT:
e->command = "aeap show server";
e->command = "aeap show client";
e->usage =
"Usage: aeap show server <id>\n"
" Show the AEAP settings for a given server\n";
"Usage: aeap show client <id>\n"
" Show the AEAP settings for a given client\n";
return NULL;
case CLI_GENERATE:
if (a->pos == 3) {
return aeap_tab_complete_name(a->word, aeap_server_get_all());
return aeap_tab_complete_name(a->word, ast_aeap_client_configs_get(NULL));
} else {
return NULL;
}
@@ -202,23 +237,23 @@ static char *aeap_server_show(struct ast_cli_entry *e, int cmd, struct ast_cli_a
return CLI_SHOWUSAGE;
}
cfg = aeap_server_get(a->argv[3]);
cfg = client_config_get(a->argv[3]);
aeap_cli_show(cfg, a, 0);
ao2_cleanup(cfg);
return CLI_SUCCESS;
}
static char *aeap_server_show_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char *client_config_show_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct ao2_container *container;
switch(cmd) {
case CLI_INIT:
e->command = "aeap show servers";
e->command = "aeap show clients";
e->usage =
"Usage: aeap show servers\n"
" Show all configured AEAP servers\n";
"Usage: aeap show clients\n"
" Show all configured AEAP clients\n";
return NULL;
case CLI_GENERATE:
return NULL;
@@ -228,9 +263,9 @@ static char *aeap_server_show_all(struct ast_cli_entry *e, int cmd, struct ast_c
return CLI_SHOWUSAGE;
}
container = aeap_server_get_all();
container = ast_aeap_client_configs_get(NULL);
if (!container || ao2_container_count(container) == 0) {
ast_cli(a->fd, "No AEAP servers found\n");
ast_cli(a->fd, "No AEAP clients found\n");
ao2_cleanup(container);
return CLI_SUCCESS;
}
@@ -242,12 +277,75 @@ static char *aeap_server_show_all(struct ast_cli_entry *e, int cmd, struct ast_c
}
static struct ast_cli_entry aeap_cli[] = {
AST_CLI_DEFINE(aeap_server_show, "Show AEAP server configuration by id"),
AST_CLI_DEFINE(aeap_server_show_all, "Show all AEAP server configurations"),
AST_CLI_DEFINE(client_config_show, "Show AEAP client configuration by id"),
AST_CLI_DEFINE(client_config_show_all, "Show all AEAP client configurations"),
};
static struct ast_aeap *aeap_create(const char *id, const struct ast_aeap_params *params,
int connect, int timeout)
{
struct ast_aeap_client_config *cfg;
struct ast_aeap *aeap;
const char *url = NULL;
const char *protocol = NULL;
cfg = client_config_get(id);
if (cfg) {
url = cfg->url;
protocol = cfg->protocol;
}
#ifdef TEST_FRAMEWORK
else if (ast_begins_with(id, "_aeap_test_")) {
url = "ws://127.0.0.1:8088/ws";
protocol = id;
}
#endif
if (!url && !protocol) {
ast_log(LOG_ERROR, "AEAP: unable to get configuration for '%s'\n", id);
return NULL;
}
aeap = connect ? ast_aeap_create_and_connect(url, params, url, protocol, timeout) :
ast_aeap_create(url, params);
ao2_cleanup(cfg);
return aeap;
}
struct ast_aeap *ast_aeap_create_by_id(const char *id, const struct ast_aeap_params *params)
{
return aeap_create(id, params, 0, 0);
}
struct ast_aeap *ast_aeap_create_and_connect_by_id(const char *id,
const struct ast_aeap_params *params, int timeout)
{
return aeap_create(id, params, 1, timeout);
}
struct ast_variable *ast_aeap_custom_fields_get(const char *id)
{
struct ast_aeap_client_config *cfg;
struct ast_variable *vars;
cfg = client_config_get(id);
if (!cfg) {
ast_log(LOG_WARNING, "AEAP: no client configuration '%s' to get fields\n", id);
return NULL;
}
vars = ast_sorcery_objectset_create(aeap_sorcery, cfg);
ao2_ref(cfg, -1);
return vars;
}
static int reload_module(void)
{
ast_sorcery_reload(aeap_sorcery);
return 0;
}
@@ -258,28 +356,35 @@ static int unload_module(void)
ast_cli_unregister_multiple(aeap_cli, ARRAY_LEN(aeap_cli));
aeap_general_finalize();
return 0;
}
static int load_module(void)
{
if (aeap_general_initialize()) {
return AST_MODULE_LOAD_DECLINE;
}
if (!(aeap_sorcery = ast_sorcery_open()))
{
ast_log(LOG_ERROR, "AEAP - failed to open sorcery\n");
return AST_MODULE_LOAD_DECLINE;
}
ast_sorcery_apply_default(aeap_sorcery, "server", "config", "aeap.conf,criteria=type=server");
ast_sorcery_apply_default(aeap_sorcery, AEAP_CONFIG_CLIENT, "config", "aeap.conf,criteria=type=client");
if (ast_sorcery_object_register(aeap_sorcery, "server", aeap_server_alloc,
NULL, aeap_server_apply)) {
ast_log(LOG_ERROR, "AEAP - failed to register server sorcery object\n");
if (ast_sorcery_object_register(aeap_sorcery, "client", client_config_alloc,
NULL, client_config_apply)) {
ast_log(LOG_ERROR, "AEAP - failed to register client sorcery object\n");
return AST_MODULE_LOAD_DECLINE;
}
ast_sorcery_object_field_register(aeap_sorcery, "server", "type", "", OPT_NOOP_T, 0, 0);
ast_sorcery_object_field_register(aeap_sorcery, "server", "server_url", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct aeap_server, server_url));
ast_sorcery_object_field_register(aeap_sorcery, "server", "codecs", "", OPT_CODEC_T, 1, FLDSET(struct aeap_server, codecs));
ast_sorcery_object_field_register(aeap_sorcery, AEAP_CONFIG_CLIENT, "type", "", OPT_NOOP_T, 0, 0);
ast_sorcery_object_field_register(aeap_sorcery, AEAP_CONFIG_CLIENT, "url", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_aeap_client_config, url));
ast_sorcery_object_field_register(aeap_sorcery, AEAP_CONFIG_CLIENT, "protocol", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_aeap_client_config, protocol));
ast_sorcery_object_field_register(aeap_sorcery, AEAP_CONFIG_CLIENT, "codecs", "", OPT_CODEC_T, 1, FLDSET(struct ast_aeap_client_config, codecs));
ast_sorcery_load(aeap_sorcery);
@@ -295,4 +400,5 @@ AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_
.unload = unload_module,
.reload = reload_module,
.load_pri = AST_MODPRI_CHANNEL_DEPEND,
.requires = "res_http_websocket",
);

7
res/res_aeap.exports.in Normal file
View File

@@ -0,0 +1,7 @@
{
global:
LINKER_SYMBOL_PREFIXaeap_*;
LINKER_SYMBOL_PREFIXast_aeap_*;
local:
*;
};

501
res/res_aeap/aeap.c Normal file
View File

@@ -0,0 +1,501 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include <pthread.h>
#include "asterisk/astobj2.h"
#include "asterisk/strings.h"
#include "asterisk/res_aeap.h"
#include "asterisk/res_aeap_message.h"
#include "logger.h"
#include "transaction.h"
#include "transport.h"
#define AEAP_RECV_SIZE 32768
struct aeap_user_data {
/*! The user data object */
void *obj;
/*! A user data identifier */
char id[0];
};
AO2_STRING_FIELD_HASH_FN(aeap_user_data, id);
AO2_STRING_FIELD_CMP_FN(aeap_user_data, id);
#define USER_DATA_BUCKETS 11
struct ast_aeap {
/*! This object's configuration parameters */
const struct ast_aeap_params *params;
/*! Container for registered user data objects */
struct ao2_container *user_data;
/*! Transactions container */
struct ao2_container *transactions;
/*! Transport layer communicator */
struct aeap_transport *transport;
/*! Id of thread that reads data from the transport */
pthread_t read_thread_id;
};
static int tsx_end(void *obj, void *arg, int flags)
{
aeap_transaction_end(obj, -1);
return 0;
}
static void aeap_destructor(void *obj)
{
struct ast_aeap *aeap = obj;
/* Disconnect things first, which keeps transactions from further executing */
ast_aeap_disconnect(aeap);
aeap_transport_destroy(aeap->transport);
/*
* Each contained transaction holds a pointer back to this transactions container,
* which is removed upon transaction end. Thus by explicitly ending each transaction
* here we can ensure all references to the transactions container are removed.
*/
ao2_callback(aeap->transactions, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE,
tsx_end, NULL);
ao2_cleanup(aeap->transactions);
ao2_cleanup(aeap->user_data);
}
struct ast_aeap *ast_aeap_create(const char *transport_type,
const struct ast_aeap_params *params)
{
struct ast_aeap *aeap;
aeap = ao2_alloc(sizeof(*aeap), aeap_destructor);
if (!aeap) {
ast_log(LOG_ERROR, "AEAP: unable to create");
return NULL;
}
aeap->params = params;
aeap->read_thread_id = AST_PTHREADT_NULL;
aeap->user_data = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, USER_DATA_BUCKETS,
aeap_user_data_hash_fn, NULL, aeap_user_data_cmp_fn);
if (!aeap->user_data) {
aeap_error(aeap, NULL, "unable to create user data container");
ao2_ref(aeap, -1);
return NULL;
}
aeap->transactions = aeap_transactions_create();
if (!aeap->transactions) {
aeap_error(aeap, NULL, "unable to create transactions container");
ao2_ref(aeap, -1);
return NULL;
}
aeap->transport = aeap_transport_create(transport_type);
if (!aeap->transport) {
aeap_error(aeap, NULL, "unable to create transport");
ao2_ref(aeap, -1);
return NULL;
}
return aeap;
}
static struct aeap_user_data *aeap_user_data_create(const char *id, void *obj,
ast_aeap_user_obj_cleanup cleanup)
{
struct aeap_user_data *data;
ast_assert(id != NULL);
data = ao2_t_alloc_options(sizeof(*data) + strlen(id) + 1, cleanup,
AO2_ALLOC_OPT_LOCK_NOLOCK, "");
if (!data) {
if (cleanup) {
cleanup(obj);
}
return NULL;
}
strcpy(data->id, id); /* safe */
data->obj = obj;
return data;
}
int ast_aeap_user_data_register(struct ast_aeap *aeap, const char *id, void *obj,
ast_aeap_user_obj_cleanup cleanup)
{
struct aeap_user_data *data;
data = aeap_user_data_create(id, obj, cleanup);
if (!data) {
return -1;
}
if (!ao2_link(aeap->user_data, data)) {
ao2_ref(data, -1);
return -1;
}
ao2_ref(data, -1);
return 0;
}
void ast_aeap_user_data_unregister(struct ast_aeap *aeap, const char *id)
{
ao2_find(aeap->user_data, id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
}
void *ast_aeap_user_data_object_by_id(struct ast_aeap *aeap, const char *id)
{
struct aeap_user_data *data;
void *obj;
data = ao2_find(aeap->user_data, id, OBJ_SEARCH_KEY);
if (!data) {
return NULL;
}
obj = data->obj;
ao2_ref(data, -1);
/*
* Returned object's lifetime is based on how it was registered.
* See public function docs for more info
*/
return obj;
}
static int raise_msg_handler(struct ast_aeap *aeap, const struct ast_aeap_message_handler *handlers,
size_t size, struct ast_aeap_message *msg, void *data)
{
ast_aeap_on_message on_message = NULL;
size_t i;
if (!aeap->params->emit_error) {
const char *error_msg = ast_aeap_message_error_msg(msg);
if (error_msg) {
aeap_error(aeap, NULL, "%s", error_msg);
return -1;
}
/* If no error_msg then it's assumed this is not an error message */
}
for (i = 0; i < size; ++i) {
if (ast_strlen_zero(handlers[i].name)) {
/* A default handler is specified. Use it if no other match is found */
on_message = handlers[i].on_message;
continue;
}
if (ast_aeap_message_is_named(msg, handlers[i].name)) {
on_message = handlers[i].on_message;
break;
}
}
if (on_message) {
return on_message(aeap, msg, data);
}
/* Respond with un-handled error */
ast_aeap_send_msg(aeap, ast_aeap_message_create_error(aeap->params->msg_type,
ast_aeap_message_name(msg), ast_aeap_message_id(msg),
"Unsupported and/or un-handled message"));
return 0;
}
static void raise_msg(struct ast_aeap *aeap, const void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE serial_type)
{
struct ast_aeap_message *msg;
struct aeap_transaction *tsx;
int res = 0;
if (!aeap->params || !aeap->params->msg_type ||
ast_aeap_message_serial_type(aeap->params->msg_type) != serial_type ||
!(msg = ast_aeap_message_deserialize(aeap->params->msg_type, buf, size))) {
return;
}
/* See if this msg is involved in a transaction */
tsx = aeap_transaction_get(aeap->transactions, ast_aeap_message_id(msg));
/* If so go ahead and cancel the timeout timer */
aeap_transaction_cancel_timer(tsx);
if (aeap->params->request_handlers && ast_aeap_message_is_request(msg)) {
res = raise_msg_handler(aeap, aeap->params->request_handlers, aeap->params->request_handlers_size,
msg, tsx ? aeap_transaction_user_obj(tsx) : NULL);
} else if (aeap->params->response_handlers && ast_aeap_message_is_response(msg)) {
res = raise_msg_handler(aeap, aeap->params->response_handlers, aeap->params->response_handlers_size,
msg, tsx ? aeap_transaction_user_obj(tsx) : NULL);
}
/* Complete transaction (Note, removes tsx ref) */
aeap_transaction_end(tsx, res);
ao2_ref(msg, -1);
}
static void *aeap_receive(void *data)
{
struct ast_aeap *aeap = data;
void *buf;
buf = ast_calloc(1, AEAP_RECV_SIZE);
if (!buf) {
aeap_error(aeap, NULL, "unable to create read buffer");
goto aeap_receive_error;
}
while (aeap_transport_is_connected(aeap->transport)) {
enum AST_AEAP_DATA_TYPE rtype;
intmax_t size;
size = aeap_transport_read(aeap->transport, buf, AEAP_RECV_SIZE, &rtype);
if (size < 0) {
goto aeap_receive_error;
}
if (!size) {
continue;
}
switch (rtype) {
case AST_AEAP_DATA_TYPE_BINARY:
if (aeap->params && aeap->params->on_binary) {
aeap->params->on_binary(aeap, buf, size);
}
break;
case AST_AEAP_DATA_TYPE_STRING:
ast_debug(3, "AEAP: received message: %s\n", (char *)buf);
if (aeap->params && aeap->params->on_string) {
aeap->params->on_string(aeap, (const char *)buf, size - 1);
}
break;
default:
break;
}
raise_msg(aeap, buf, size, rtype);
};
ast_free(buf);
return NULL;
aeap_receive_error:
/*
* An unrecoverable error occurred so ensure the aeap and transport reset
* to a disconnected state. We don't want this thread to "join" itself so set
* its id to NULL prior to disconnecting.
*/
aeap_error(aeap, NULL, "unrecoverable read error, disconnecting");
ao2_lock(aeap);
aeap->read_thread_id = AST_PTHREADT_NULL;
ao2_unlock(aeap);
ast_aeap_disconnect(aeap);
ast_free(buf);
if (aeap->params && aeap->params->on_error) {
aeap->params->on_error(aeap);
}
return NULL;
}
int ast_aeap_connect(struct ast_aeap *aeap, const char *url, const char *protocol, int timeout)
{
SCOPED_AO2LOCK(lock, aeap);
if (aeap_transport_is_connected(aeap->transport)) {
/* Should already be connected, so nothing to do */
return 0;
}
if (aeap_transport_connect(aeap->transport, url, protocol, timeout)) {
aeap_error(aeap, NULL, "unable to connect transport");
return -1;
}
if (ast_pthread_create_background(&aeap->read_thread_id, NULL,
aeap_receive, aeap)) {
aeap_error(aeap, NULL, "unable to start read thread: %s",
strerror(errno));
ast_aeap_disconnect(aeap);
return -1;
}
return 0;
}
struct ast_aeap *ast_aeap_create_and_connect(const char *type,
const struct ast_aeap_params *params, const char *url, const char *protocol, int timeout)
{
struct ast_aeap *aeap;
aeap = ast_aeap_create(type, params);
if (!aeap) {
return NULL;
}
if (ast_aeap_connect(aeap, url, protocol, timeout)) {
ao2_ref(aeap, -1);
return NULL;
}
return aeap;
}
int ast_aeap_disconnect(struct ast_aeap *aeap)
{
ao2_lock(aeap);
aeap_transport_disconnect(aeap->transport);
if (aeap->read_thread_id != AST_PTHREADT_NULL) {
/*
* The read thread calls disconnect if an error occurs, so
* unlock the aeap before "joining" to avoid a deadlock.
*/
ao2_unlock(aeap);
pthread_join(aeap->read_thread_id, NULL);
ao2_lock(aeap);
aeap->read_thread_id = AST_PTHREADT_NULL;
}
ao2_unlock(aeap);
return 0;
}
static int aeap_send(struct ast_aeap *aeap, const void *buf, uintmax_t size,
enum AST_AEAP_DATA_TYPE type)
{
intmax_t num;
num = aeap_transport_write(aeap->transport, buf, size, type);
if (num == 0) {
/* Nothing written, could be disconnected */
return 0;
}
if (num < 0) {
aeap_error(aeap, NULL, "error sending data");
return -1;
}
if (num < size) {
aeap_error(aeap, NULL, "not all data sent");
return -1;
}
if (num > size) {
aeap_error(aeap, NULL, "sent data truncated");
return -1;
}
return 0;
}
int ast_aeap_send_binary(struct ast_aeap *aeap, const void *buf, uintmax_t size)
{
return aeap_send(aeap, buf, size, AST_AEAP_DATA_TYPE_BINARY);
}
int ast_aeap_send_msg(struct ast_aeap *aeap, struct ast_aeap_message *msg)
{
void *buf;
intmax_t size;
int res;
if (!msg) {
aeap_error(aeap, NULL, "no message to send");
return -1;
}
if (ast_aeap_message_serialize(msg, &buf, &size)) {
aeap_error(aeap, NULL, "unable to serialize outgoing message");
ao2_ref(msg, -1);
return -1;
}
res = aeap_send(aeap, buf, size, msg->type->serial_type);
ast_free(buf);
ao2_ref(msg, -1);
return res;
}
int ast_aeap_send_msg_tsx(struct ast_aeap *aeap, struct ast_aeap_tsx_params *params)
{
struct aeap_transaction *tsx = NULL;
int res = 0;
if (!params) {
return -1;
}
if (!params->msg) {
aeap_transaction_params_cleanup(params);
aeap_error(aeap, NULL, "no message to send");
return -1;
}
/* The transaction will take over params cleanup, which includes the msg reference */
tsx = aeap_transaction_create_and_add(aeap->transactions,
ast_aeap_message_id(params->msg), params, aeap);
if (!tsx) {
return -1;
}
if (ast_aeap_send_msg(aeap, ao2_bump(params->msg))) {
aeap_transaction_end(tsx, -1); /* Removes container, and tsx ref */
return -1;
}
if (aeap_transaction_start(tsx)) {
aeap_transaction_end(tsx, -1); /* Removes container, and tsx ref */
return -1;
}
res = aeap_transaction_result(tsx);
ao2_ref(tsx, -1);
return res;
}

58
res/res_aeap/general.c Normal file
View File

@@ -0,0 +1,58 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/sched.h"
#include "general.h"
/*! \brief Scheduler for transaction timeouts */
static struct ast_sched_context *sched = NULL;
struct ast_sched_context *aeap_sched_context(void)
{
return sched;
}
void aeap_general_finalize(void)
{
if (sched) {
ast_sched_context_destroy(sched);
sched = NULL;
}
}
int aeap_general_initialize(void)
{
sched = ast_sched_context_create();
if (!sched) {
ast_log(LOG_ERROR, "AEAP scheduler: unable to create context");
return -1;
}
if (ast_sched_start_thread(sched)) {
ast_log(LOG_ERROR, "AEAP scheduler: unable to start thread");
aeap_general_finalize();
return -1;
}
return 0;
}

41
res/res_aeap/general.h Normal file
View File

@@ -0,0 +1,41 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef RES_AEAP_GENERAL_H
#define RES_AEAP_GENERAL_H
/*!
* \brief Retrieve the scheduling context
*
* \returns The scheduling context
*/
struct ast_sched_context *aeap_sched_context(void);
/*!
* \brief Initialize general/common AEAP facilities
*
* \returns 0 on success, -1 on error
*/
int aeap_general_initialize(void);
/*!
* \brief Finalize/cleanup general AEAP facilities
*/
void aeap_general_finalize(void);
#endif /* RES_AEAP_GENERAL_H */

60
res/res_aeap/logger.h Normal file
View File

@@ -0,0 +1,60 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef RES_AEAP_LOGGER_H
#define RES_AEAP_LOGGER_H
#include "asterisk.h"
#include "asterisk/logger.h"
#include "asterisk/strings.h"
/*!
* \brief Log an Asterisk external application message
*
* \param level The logging level
* \param obj The object being logged
* \param name Optional subsystem name
* \param fmt Format string
* \param ... Parameters for the format string
*/
#define aeap_log(level, obj, name, fmt, ...) \
ast_log(level, "AEAP%s%s (%p): " fmt "\n", ast_strlen_zero(name) ? "" : " ", \
ast_strlen_zero(name) ? "" : name, obj, ##__VA_ARGS__)
/*!
* \brief Log an Asterisk external application error
*
* \param obj The object being logged
* \param name Optional subsystem name
* \param fmt Format string
* \param ... Parameters for the format string
*/
#define aeap_error(obj, name, fmt, ...) aeap_log(LOG_ERROR, obj, name, fmt, ##__VA_ARGS__)
/*!
* \brief Log an Asterisk external application warning
*
* \param obj The object being logged
* \param name Optional subsystem name
* \param fmt Format string
* \param ... Parameters for the format string
*/
#define aeap_warn(obj, name, fmt, ...) aeap_log(LOG_WARNING, obj, name, fmt, ##__VA_ARGS__)
#endif /* RES_AEAP_LOGGER_H */

270
res/res_aeap/message.c Normal file
View File

@@ -0,0 +1,270 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/res_aeap.h"
#include "asterisk/res_aeap_message.h"
enum AST_AEAP_DATA_TYPE ast_aeap_message_serial_type(const struct ast_aeap_message_type *type)
{
ast_assert(type != NULL);
return type->serial_type;
}
static void message_destructor(void *obj)
{
struct ast_aeap_message *msg = obj;
if (msg->type->destruct) {
msg->type->destruct(msg);
}
}
static struct ast_aeap_message *message_create(const struct ast_aeap_message_type *type)
{
struct ast_aeap_message *msg;
msg = ao2_t_alloc_options(type->type_size, message_destructor,
AO2_ALLOC_OPT_LOCK_NOLOCK, type->type_name);
if (!msg) {
ast_log(LOG_ERROR, "AEAP message %s: unable to create\n", type->type_name);
return NULL;
}
msg->type = type;
return msg;
}
struct ast_aeap_message *ast_aeap_message_create1(const struct ast_aeap_message_type *type,
const void *params)
{
struct ast_aeap_message *msg;
ast_assert(type != NULL);
ast_assert(type->construct1 != NULL);
msg = message_create(type);
if (!msg) {
return NULL;
}
if (type->construct1(msg, params)) {
ast_log(LOG_ERROR, "AEAP message %s: unable to construct1\n", type->type_name);
ao2_ref(msg, -1);
return NULL;
}
return msg;
}
struct ast_aeap_message *ast_aeap_message_create2(const struct ast_aeap_message_type *type,
const char *msg_type, const char *name, const char *id, const void *params)
{
struct ast_aeap_message *msg;
ast_assert(type != NULL);
ast_assert(type->construct2 != NULL);
ast_assert(msg_type != NULL);
ast_assert(name != NULL);
msg = message_create(type);
if (!msg) {
return NULL;
}
if (type->construct2(msg, msg_type, name, id, params)) {
ast_log(LOG_ERROR, "AEAP message %s: unable to construct2\n", type->type_name);
ao2_ref(msg, -1);
return NULL;
}
return msg;
}
struct ast_aeap_message *ast_aeap_message_create_request(const struct ast_aeap_message_type *type,
const char *name, const char *id, const void *params)
{
struct ast_aeap_message *msg;
msg = ast_aeap_message_create2(type, "request", name, id, params);
if (!msg) {
return NULL;
}
if (!id && !ast_aeap_message_id_generate(msg)) {
ao2_ref(msg, -1);
return NULL;
}
return msg;
}
struct ast_aeap_message *ast_aeap_message_create_response(const struct ast_aeap_message_type *type,
const char *name, const char *id, const void *params)
{
return ast_aeap_message_create2(type, "response", name, id, params);
}
struct ast_aeap_message *ast_aeap_message_create_error(const struct ast_aeap_message_type *type,
const char *name, const char *id, const char *error_msg)
{
struct ast_aeap_message *msg;
msg = ast_aeap_message_create_response(type, name, id, NULL);
if (!msg) {
return NULL;
}
if (ast_aeap_message_error_msg_set(msg, error_msg)) {
ao2_ref(msg, -1);
return NULL;
}
return msg;
}
struct ast_aeap_message *ast_aeap_message_deserialize(const struct ast_aeap_message_type *type,
const void *buf, intmax_t size)
{
struct ast_aeap_message *msg;
ast_assert(type != NULL);
ast_assert(type->deserialize != NULL);
msg = ast_aeap_message_create1(type, NULL);
if (!msg) {
return NULL;
}
if (type->deserialize(msg, buf, size)) {
ao2_ref(msg, -1);
return NULL;
}
return msg;
}
int ast_aeap_message_serialize(const struct ast_aeap_message *message,
void **buf, intmax_t *size)
{
ast_assert(message != NULL);
ast_assert(message->type != NULL);
return message->type->serialize ? message->type->serialize(message, buf, size) : 0;
}
const char *ast_aeap_message_id(const struct ast_aeap_message *message)
{
const char *id = NULL;
ast_assert(message != NULL);
ast_assert(message->type != NULL);
if (message->type->id) {
id = message->type->id(message);
}
return id ? id : "";
}
int ast_aeap_message_id_set(struct ast_aeap_message *message, const char *id)
{
ast_assert(message != NULL);
ast_assert(message->type != NULL);
return message->type->id_set ? message->type->id_set(message, id) : 0;
}
const char *ast_aeap_message_id_generate(struct ast_aeap_message *message)
{
char uuid_str[AST_UUID_STR_LEN];
ast_uuid_generate_str(uuid_str, sizeof(uuid_str));
if (strlen(uuid_str) != (AST_UUID_STR_LEN - 1)) {
ast_log(LOG_ERROR, "AEAP message %s failed to generate UUID for message '%s'",
message->type->type_name, ast_aeap_message_name(message));
return NULL;
}
return ast_aeap_message_id_set(message, uuid_str) ? NULL : ast_aeap_message_id(message);
}
const char *ast_aeap_message_name(const struct ast_aeap_message *message)
{
const char *name = NULL;
ast_assert(message != NULL);
ast_assert(message->type != NULL);
if (message->type->name) {
name = message->type->name(message);
}
return name ? name : "";
}
int ast_aeap_message_is_named(const struct ast_aeap_message *message, const char *name)
{
return name ? !strcasecmp(ast_aeap_message_name(message), name) : 0;
}
void *ast_aeap_message_data(struct ast_aeap_message *message)
{
ast_assert(message != NULL);
ast_assert(message->type != NULL);
return message->type->data ? message->type->data(message) : NULL;
}
int ast_aeap_message_is_request(const struct ast_aeap_message *message)
{
ast_assert(message != NULL);
ast_assert(message->type != NULL);
return message->type->is_request ? message->type->is_request(message) : 0;
}
int ast_aeap_message_is_response(const struct ast_aeap_message *message)
{
ast_assert(message != NULL);
ast_assert(message->type != NULL);
return message->type->is_response ? message->type->is_response(message) : 0;
}
const char *ast_aeap_message_error_msg(const struct ast_aeap_message *message)
{
ast_assert(message != NULL);
ast_assert(message->type != NULL);
return message->type->error_msg ? message->type->error_msg(message) : NULL;
}
int ast_aeap_message_error_msg_set(struct ast_aeap_message *message, const char *error_msg)
{
ast_assert(message != NULL);
ast_assert(message->type != NULL);
return message->type->error_msg_set ? message->type->error_msg_set(message, error_msg) : 0;
}

191
res/res_aeap/message_json.c Normal file
View File

@@ -0,0 +1,191 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/json.h"
#include "asterisk/res_aeap_message.h"
#define JSON_MSG(lvalue, rvalue) struct message_json *lvalue = \
((struct message_json *)rvalue)
/*!
* \brief Asterisk external application JSON message type
*/
struct message_json {
/*! The base message type (must be first) */
struct ast_aeap_message base;
/*! Underlying JSON data structure */
struct ast_json *json;
};
static int message_json_construct1(struct ast_aeap_message *self, const void *params)
{
JSON_MSG(msg, self);
msg->json = ast_json_ref((struct ast_json *)params) ?: ast_json_object_create();
return msg->json ? 0 : -1;
}
static int message_json_construct2(struct ast_aeap_message *self, const char *msg_type,
const char *name, const char *id, const void *params)
{
struct ast_json *msg_data;
int res;
msg_data = ast_json_pack("{s:s,s:s*}", msg_type, name, "id", id);
if (!msg_data) {
ast_log(LOG_ERROR, "AEAP message json: failed to create data for '%s: %s'", msg_type, name);
return -1;
}
if (params && ast_json_object_update(msg_data, (struct ast_json *)params)) {
ast_log(LOG_ERROR, "AEAP message json: failed to update data for '%s: %s'", msg_type, name);
ast_json_unref(msg_data);
return -1;
}
res = message_json_construct1(self, msg_data);
ast_json_unref(msg_data);
return res;
}
static void message_json_destruct(struct ast_aeap_message *self)
{
JSON_MSG(msg, self);
ast_json_unref(msg->json);
}
static int message_json_deserialize(struct ast_aeap_message *self, const void *buf, intmax_t size)
{
JSON_MSG(msg, self);
msg->json = ast_json_load_buf(buf, size, NULL);
return msg->json ? 0 : -1;
}
static int message_json_serialize(const struct ast_aeap_message *self, void **buf, intmax_t *size)
{
const JSON_MSG(msg, self);
*buf = ast_json_dump_string(msg->json);
if (!*buf) {
*size = 0;
return -1;
}
*size = strlen(*buf);
return 0;
}
static const char *message_json_id(const struct ast_aeap_message *self)
{
const JSON_MSG(msg, self);
return ast_json_object_string_get(msg->json, "id");
}
static int message_json_id_set(struct ast_aeap_message *self, const char *id)
{
JSON_MSG(msg, self);
if (ast_json_object_set(msg->json, "id", ast_json_string_create(id))) {
return -1;
}
return 0;
}
static const char *message_json_name(const struct ast_aeap_message *self)
{
const JSON_MSG(msg, self);
struct ast_json_iter *iter;
iter = ast_json_object_iter_at(msg->json, "response");
if (!iter) {
iter = ast_json_object_iter_at(msg->json, "request");
}
return iter ? ast_json_string_get(ast_json_object_iter_value(iter)) : "";
}
static void *message_json_data(struct ast_aeap_message *self)
{
JSON_MSG(msg, self);
return msg->json;
}
static int message_json_is_request(const struct ast_aeap_message *self)
{
const JSON_MSG(msg, self);
return ast_json_object_iter_at(msg->json, "request") != NULL;
}
static int message_json_is_response(const struct ast_aeap_message *self)
{
const JSON_MSG(msg, self);
return ast_json_object_iter_at(msg->json, "response") != NULL;
}
static const char *message_json_error_msg(const struct ast_aeap_message *self)
{
const JSON_MSG(msg, self);
return ast_json_object_string_get(msg->json, "error_msg");
}
static int message_json_error_msg_set(struct ast_aeap_message *self, const char *error_msg)
{
JSON_MSG(msg, self);
if (ast_json_object_set(msg->json, "error_msg", ast_json_string_create(error_msg))) {
return -1;
}
return 0;
}
static const struct ast_aeap_message_type message_type_json = {
.type_size = sizeof(struct message_json),
.type_name = "json",
.serial_type = AST_AEAP_DATA_TYPE_STRING,
.construct1 = message_json_construct1,
.construct2 = message_json_construct2,
.destruct = message_json_destruct,
.deserialize = message_json_deserialize,
.serialize = message_json_serialize,
.id = message_json_id,
.id_set = message_json_id_set,
.name = message_json_name,
.data = message_json_data,
.is_request = message_json_is_request,
.is_response = message_json_is_response,
.error_msg = message_json_error_msg,
.error_msg_set = message_json_error_msg_set,
};
const struct ast_aeap_message_type *ast_aeap_message_type_json = &message_type_json;

284
res/res_aeap/transaction.c Normal file
View File

@@ -0,0 +1,284 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/sched.h"
#include "asterisk/utils.h"
#include "asterisk/res_aeap.h"
#include "asterisk/res_aeap_message.h"
#include "general.h"
#include "logger.h"
#include "transaction.h"
struct aeap_transaction {
/*! Pointer back to owner object */
struct ast_aeap *aeap;
/*! The container this transaction is in */
struct ao2_container *container;
/*! Scheduler ID message timeout */
int sched_id;
/*! Whether or not the handler has been executed */
int handled;
/*! Used to sync matching received messages */
ast_cond_t handled_cond;
/*! The result of this transaction */
int result;
/*! The timeout data */
struct ast_aeap_tsx_params params;
/*! The transaction identifier */
char id[0];
};
/*! \brief Number of transaction buckets */
#define AEAP_TRANSACTION_BUCKETS 11
AO2_STRING_FIELD_HASH_FN(aeap_transaction, id);
AO2_STRING_FIELD_CMP_FN(aeap_transaction, id);
int aeap_transaction_cancel_timer(struct aeap_transaction *tsx)
{
if (tsx && tsx->sched_id != -1) {
AST_SCHED_DEL_UNREF(aeap_sched_context(), tsx->sched_id, ao2_ref(tsx, -1));
return tsx->sched_id != -1;
}
return 0;
}
void aeap_transaction_params_cleanup(struct ast_aeap_tsx_params *params)
{
ao2_cleanup(params->msg);
if (params->obj_cleanup) {
params->obj_cleanup(params->obj);
}
}
static void transaction_destructor(void *obj)
{
struct aeap_transaction *tsx = obj;
/* Ensure timer is canceled */
aeap_transaction_cancel_timer(tsx);
aeap_transaction_params_cleanup(&tsx->params);
ast_cond_destroy(&tsx->handled_cond);
}
static struct aeap_transaction *transaction_create(const char *id,
struct ast_aeap_tsx_params *params, struct ast_aeap *aeap)
{
struct aeap_transaction *tsx;
if (!id) {
aeap_error(aeap, "transaction", "missing transaction id");
aeap_transaction_params_cleanup(params);
return NULL;
}
tsx = ao2_alloc(sizeof(*tsx) + strlen(id) + 1, transaction_destructor);
if (!tsx) {
aeap_error(aeap, "transaction", "unable to create for '%s'", id);
aeap_transaction_params_cleanup(params);
return NULL;
}
strcpy(tsx->id, id); /* safe */
tsx->sched_id = -1;
ast_cond_init(&tsx->handled_cond, NULL);
/*
* Currently, transactions, and their lifetimes are fully managed by the given 'aeap'
* object, so do not bump its reference here as we want the 'aeap' object to stop
* transactions and not transactions potentially stopping the 'aeap' object.
*/
tsx->aeap = aeap;
tsx->params = *params;
return tsx;
}
static void transaction_end(struct aeap_transaction *tsx, int timed_out, int result)
{
if (!tsx) {
return;
}
ao2_lock(tsx);
tsx->result = result;
if (tsx->container) {
ao2_unlink(tsx->container, tsx);
tsx->container = NULL;
}
if (!timed_out) {
aeap_transaction_cancel_timer(tsx);
} else if (tsx->sched_id != -1) {
tsx->sched_id = -1;
}
if (!tsx->handled) {
if (timed_out) {
if (tsx->params.on_timeout) {
tsx->params.on_timeout(tsx->aeap, tsx->params.msg, tsx->params.obj);
} else {
aeap_error(tsx->aeap, "transaction", "message '%s' timed out",
ast_aeap_message_name(tsx->params.msg));
}
}
tsx->handled = 1;
ast_cond_signal(&tsx->handled_cond);
}
ao2_unlock(tsx);
ao2_ref(tsx, -1);
}
static int transaction_raise_timeout(const void *data)
{
/* Ref added added at timer creation removed in end call */
transaction_end((struct aeap_transaction *)data, 1, -1);
return 0;
}
static int transaction_sched_timer(struct aeap_transaction *tsx)
{
if (tsx->params.timeout <= 0 || tsx->sched_id != -1) {
return 0;
}
tsx->sched_id = ast_sched_add(aeap_sched_context(), tsx->params.timeout,
transaction_raise_timeout, ao2_bump(tsx));
if (tsx->sched_id == -1) {
aeap_error(tsx->aeap, "transaction", "unable to schedule timeout for '%s'", tsx->id);
ao2_ref(tsx, -1);
return -1;
}
return 0;
}
static void transaction_wait(struct aeap_transaction *tsx)
{
ao2_lock(tsx);
while (!tsx->handled) {
ast_cond_wait(&tsx->handled_cond, ao2_object_get_lockaddr(tsx));
}
ao2_unlock(tsx);
}
int aeap_transaction_start(struct aeap_transaction *tsx)
{
if (transaction_sched_timer(tsx)) {
return -1;
}
if (tsx->params.wait) {
/* Wait until transaction completes, or times out */
transaction_wait(tsx);
}
return 0;
}
struct aeap_transaction *aeap_transaction_get(struct ao2_container *transactions, const char *id)
{
return ao2_find(transactions, id, OBJ_SEARCH_KEY);
}
void aeap_transaction_end(struct aeap_transaction *tsx, int result)
{
transaction_end(tsx, 0, result);
}
int aeap_transaction_result(struct aeap_transaction *tsx)
{
return tsx->result;
}
void *aeap_transaction_user_obj(struct aeap_transaction *tsx)
{
return tsx->params.obj;
}
struct aeap_transaction *aeap_transaction_create_and_add(struct ao2_container *transactions,
const char *id, struct ast_aeap_tsx_params *params, struct ast_aeap *aeap)
{
struct aeap_transaction *tsx;
tsx = transaction_create(id, params, aeap);
if (!tsx) {
return NULL;
}
if (!ao2_link(transactions, tsx)) {
aeap_error(tsx->aeap, "transaction", "unable to add '%s' to container", id);
ao2_ref(tsx, -1);
return NULL;
}
/*
* Yes, this creates a circular reference. This reference is removed though
* upon transaction end. It's assumed here that the given transactions container
* takes "ownership", and ultimate responsibility of its contained transactions.
* Thus when the given container needs to be unref'ed/freed it must call
* aeap_transaction_end for each transaction prior to doing so.
*/
/* tsx->container = ao2_bump(transactions); */
/*
* The transaction needs to know what container manages it, so it can remove
* itself from the given container under certain conditions (e.g. transaction
* timeout).
*
* It's expected that the given container will out live any contained transaction
* (i.e. the container will not itself be destroyed before ensuring all contained
* transactions are ended, and removed). Thus there is no reason to bump the given
* container's reference here.
*/
tsx->container = transactions;
return tsx;
}
struct ao2_container *aeap_transactions_create(void)
{
struct ao2_container *transactions;
transactions = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, AEAP_TRANSACTION_BUCKETS,
aeap_transaction_hash_fn, NULL, aeap_transaction_cmp_fn);
if (!transactions) {
ast_log(LOG_ERROR, "AEAP transaction: unable to create container\n");
return NULL;
}
return transactions;
}

123
res/res_aeap/transaction.h Normal file
View File

@@ -0,0 +1,123 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef RES_AEAP_TRANSACTION_H
#define RES_AEAP_TRANSACTION_H
#include "asterisk/res_aeap.h"
struct ao2_container;
struct ast_aeap_tsx_params;
struct aeap_transaction;
/*!
* \brief Create an Asterisk external application transactions container
*
* \returns A transaction object, or NULL on error
*/
struct ao2_container *aeap_transactions_create(void);
/*!
* \brief Create a transaction object, and add it to the given container
*
* \param transactions A transactions container
* \param id An id to use for the transaction
* \param params Transaction parameters
* \param aeap The aeap object that "owns" this transaction
*
* \returns 0 if successfully create and added, -1 on error
*/
struct aeap_transaction *aeap_transaction_create_and_add(struct ao2_container *transactions,
const char *id, struct ast_aeap_tsx_params *params, struct ast_aeap *aeap);
/*!
* \brief Clean up parameter references, and possibly call optional user object cleanup
*
* \param params Transaction parameters
*/
void aeap_transaction_params_cleanup(struct ast_aeap_tsx_params *params);
/*!
* \brief Retrieve a transaction for the id from the container
*
* \param transactions A transactions container
* \param id A transaction id
*
* \returns an AEAP transaction object, NULL if no transaction is found
*/
struct aeap_transaction *aeap_transaction_get(struct ao2_container *transactions,
const char *id);
/*!
* \brief Start the transaction
*
* \param tsx The transaction to initiate
*
* \returns 0 if successfully raised, and handled. Otherwise non zero.
*/
int aeap_transaction_start(struct aeap_transaction *tsx);
/*!
* \brief End a transaction, and remove it from the given container
*
* The "result" parameter is a value representing the state (success/failure,
* perhaps even something else) of transactional processing upon ending.
*
* \param tsx A transaction to end
* \param result A result to give to the transaction
*/
void aeap_transaction_end(struct aeap_transaction *tsx, int result);
/*!
* \brief Get a transaction's result
*
* A transaction's result is a value that represents the relative success (0), or
* failure (-1) of a transaction. For example, a timeout is considered a failure
* and will elicit a -1.
*
* This value though is also dependent upon the result of the message handler
* associated with the transaction. Meaning if an associated message is handled,
* then its result is stored as the transaction result and returned here.
*
* \param tsx A transaction object
*
* \returns The transaction result
*/
int aeap_transaction_result(struct aeap_transaction *tsx);
/*!
* \brief Cancel the transaction timer
*
* Stops the transaction timer, but does not end/stop the transaction itself
*
* \param transaction A transaction to cancel the timer on
*
* \returns 0 if canceled, non zero otherwise
*/
int aeap_transaction_cancel_timer(struct aeap_transaction *tsx);
/*!
* \brief Retrieve the user object associated with the transaction
*
* \param transaction A transaction object
*
* \returns A user object, or NULL if non associated
*/
void *aeap_transaction_user_obj(struct aeap_transaction *tsx);
#endif /* RES_AEAP_TRANSACTION_H */

156
res/res_aeap/transport.c Normal file
View File

@@ -0,0 +1,156 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/utils.h"
#include "logger.h"
#include "transport.h"
#include "transport_websocket.h"
struct aeap_transport *aeap_transport_create(const char *type)
{
struct aeap_transport *transport = NULL;
if (!strncasecmp(type, "ws", 2)) {
transport = (struct aeap_transport *)aeap_transport_websocket_create();
}
if (!transport) {
ast_log(LOG_ERROR, "AEAP transport: failed to create for type '%s'\n", type);
return NULL;
}
ast_mutex_init(&transport->read_lock);
ast_mutex_init(&transport->write_lock);
transport->connected = 0;
return transport;
}
int aeap_transport_connect(struct aeap_transport *transport, const char *url,
const char *protocol, int timeout)
{
int res;
SCOPED_MUTEX(rlock, &transport->read_lock);
SCOPED_MUTEX(wlock, &transport->write_lock);
if (aeap_transport_is_connected(transport)) {
return 0;
}
res = transport->vtable->connect(transport, url, protocol, timeout);
if (!res) {
transport->connected = 1;
}
return res;
}
struct aeap_transport *aeap_transport_create_and_connect(const char *type,
const char *url, const char *protocol, int timeout)
{
struct aeap_transport *transport = aeap_transport_create(type);
if (!transport) {
return NULL;
}
if (aeap_transport_connect(transport, url, protocol, timeout)) {
aeap_transport_destroy(transport);
return NULL;
}
return transport;
}
int aeap_transport_is_connected(struct aeap_transport *transport)
{
/*
* Avoid using a lock to 'read' the 'connected' variable in order to
* keep things slightly more efficient.
*/
return ast_atomic_fetch_add(&transport->connected, 0, __ATOMIC_RELAXED);
}
int aeap_transport_disconnect(struct aeap_transport *transport)
{
int res;
SCOPED_MUTEX(rlock, &transport->read_lock);
SCOPED_MUTEX(wlock, &transport->write_lock);
if (!aeap_transport_is_connected(transport)) {
return 0;
}
res = transport->vtable->disconnect(transport);
/*
* Even though the transport is locked here use atomics to set the value of
* 'connected' since it's possible the variable is being 'read' by another
* thread via the 'is_connected' call.
*/
ast_atomic_fetch_sub(&transport->connected, 1, __ATOMIC_RELAXED);
return res;
}
void aeap_transport_destroy(struct aeap_transport *transport)
{
if (!transport) {
return;
}
/* Ensure an orderly disconnect occurs before final destruction */
aeap_transport_disconnect(transport);
transport->vtable->destroy(transport);
ast_mutex_destroy(&transport->read_lock);
ast_mutex_destroy(&transport->write_lock);
ast_free(transport);
}
intmax_t aeap_transport_read(struct aeap_transport *transport, void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE *rtype)
{
SCOPED_MUTEX(lock, &transport->read_lock);
if (!aeap_transport_is_connected(transport)) {
return 0;
}
return transport->vtable->read(transport, buf, size, rtype);
}
intmax_t aeap_transport_write(struct aeap_transport *transport, const void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE wtype)
{
SCOPED_MUTEX(lock, &transport->write_lock);
if (!aeap_transport_is_connected(transport)) {
return 0;
}
return transport->vtable->write(transport, buf, size, wtype);
}

209
res/res_aeap/transport.h Normal file
View File

@@ -0,0 +1,209 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef RES_AEAP_TRANSPORT_H
#define RES_AEAP_TRANSPORT_H
#include <stdint.h>
#include "asterisk/res_aeap.h"
struct aeap_transport;
/*!
* \brief Asterisk external application transport virtual table
*
* Callbacks to be implemented by "derived" transports
*/
struct aeap_transport_vtable {
/*!
* \brief Connect a transport
*
* \param self The transport object
* \param url The URL to connect to
* \param protocol The connection protocol to use if applicable
* \param timeout How long (in milliseconds) to attempt to connect (-1 equals infinite)
*
* \returns 0 on success, or -1 on error
*/
int (*connect)(struct aeap_transport *self, const char *url, const char *protocol, int timeout);
/*!
* \brief Disconnect a transport
*
* \param self The transport object
*
* \returns 0 on success, or -1 on error
*/
int (*disconnect)(struct aeap_transport *self);
/*!
* \brief Destroy a transport
*
* \param self The transport object
*/
void (*destroy)(struct aeap_transport *self);
/*!
* \brief Read data from a transport
*
* \param self The transport object
* \param buf The buffer data is read read into
* \param size The size of the given data buffer
* \param rtype [out] The type of data read
*
* \returns Total number of bytes read, or less than zero on error
*/
intmax_t (*read)(struct aeap_transport *self, void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE *rtype);
/*!
* \brief Write data to a transport
*
* \param self The transport object
* \param buf The data to write
* \param size The size of data to write
* \param wtype The type of data to write
*
* \returns Total number of bytes written, or less than zero on error
*/
intmax_t (*write)(struct aeap_transport *self, const void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE wtype);
};
/*!
* \brief Asterisk external application transport structure to be
* "derived" by specific transport implementation types
*
* Transports are assumed to support simultaneous reading and writing,
* thus separate read and write locks. A transport type not supporting
* such can simply apply the opposing lock during a read or write, i.e.
* lock the write lock during a read and vice versa.
*/
struct aeap_transport {
/*! Transport virtual table */
struct aeap_transport_vtable *vtable;
/*! Whether or not the transport is connected */
unsigned int connected;
/*! Lock used when reading */
ast_mutex_t read_lock;
/*! Lock used when writing */
ast_mutex_t write_lock;
};
/*!
* \brief Create an Asterisk external application transport
*
* \param type The type of transport to create
*
* \returns An Asterisk external application transport, or NULL on error
*/
struct aeap_transport *aeap_transport_create(const char *type);
/*!
* \brief Connect a transport
*
* \param transport The transport to connect
* \param url The URL to connect to
* \param protocol The connection protocol to use if applicable
* \param timeout How long (in milliseconds) to attempt to connect (-1 equals infinite)
*
* \returns 0 on success, or -1 on error
*/
int aeap_transport_connect(struct aeap_transport *transport, const char *url,
const char *protocol, int timeout);
/*!
* \brief Create an Asterisk external application transport, and connect it
*
* \param type The type of transport to create
* \param url The URL to connect to
* \param protocol The connection protocol to use if applicable
* \param timeout How long (in milliseconds) to attempt to connect (-1 equals infinite)
*
* \returns An Asterisk external application transport, or NULL on error
*/
struct aeap_transport *aeap_transport_create_and_connect(const char* type,
const char *url, const char *protocol, int timeout);
/*!
* \brief Disconnect a transport
*
* \note Locks both the transport's read and write locks before calling transport
* instance's disconnect, and unlocks both before returning.
*
* \param transport The transport to disconnect
*
* \returns 0 on success, or -1 on error
*/
int aeap_transport_disconnect(struct aeap_transport *transport);
/*!
* \brief Whether or not the transport is in a connected state
*
* \param transport The transport object
*
* \returns True if connected, false otherwise
*/
int aeap_transport_is_connected(struct aeap_transport *transport);
/*!
* \brief Destroy a transport
*
* \param transport The transport to destroy
*
* \returns 0 on success, or -1 on error
*/
void aeap_transport_destroy(struct aeap_transport *transport);
/*!
* \brief Read data from the transport
*
* This is a blocking read, and will not return until the transport
* implementation returns.
*
* \note Locks transport's read lock before calling transport instance's
* read, and unlocks it before returning.
*
* \param transport The transport to read from
* \param buf The buffer data is read into
* \param size The size of data given data buffer
* \param rtype [out] The type of data read
*
* \returns Total number of bytes read, or less than zero on error
*/
intmax_t aeap_transport_read(struct aeap_transport *transport, void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE *rtype);
/*!
* \brief Write data to the transport
*
* \note Locks transport's write lock before calling transport instance's
* write, and unlocks it before returning.
*
* \param transport The transport to write to
* \param buf The data to write
* \param size The size of data to write
* \param wtype The type of data to write
*
* \returns Total number of bytes written, or less than zero on error
*/
intmax_t aeap_transport_write(struct aeap_transport *transport, const void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE wtype);
#endif /* RES_AEAP_TRANSPORT_H */

View File

@@ -0,0 +1,249 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/http_websocket.h"
#include "asterisk/utils.h"
#include "logger.h"
#include "transport.h"
#include "transport_websocket.h"
#define log_error(obj, fmt, ...) aeap_error(obj, "websocket", fmt, ##__VA_ARGS__)
struct aeap_transport_websocket {
/*! Derive from base transport (must be first attribute) */
struct aeap_transport base;
/*! The underlying websocket */
struct ast_websocket *ws;
};
static int websocket_connect(struct aeap_transport *self, const char *url,
const char *protocol, int timeout)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
enum ast_websocket_result ws_result;
struct ast_websocket_client_options ws_options = {
.uri = url,
.protocols = protocol,
.timeout = timeout,
.tls_cfg = NULL,
};
transport->ws = ast_websocket_client_create_with_options(&ws_options, &ws_result);
if (ws_result != WS_OK) {
log_error(self, "connect failure (%d)", (int)ws_result);
return -1;
}
return 0;
}
static int websocket_disconnect(struct aeap_transport *self)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
if (transport->ws) {
ast_websocket_unref(transport->ws);
transport->ws = NULL;
}
return 0;
}
static void websocket_destroy(struct aeap_transport *self)
{
/*
* Disconnect takes care of cleaning up the websocket. Note, disconnect
* was called by the base/dispatch interface prior to calling this
* function so nothing to do here.
*/
}
static intmax_t websocket_read(struct aeap_transport *self, void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE *rtype)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
char *payload;
uint64_t bytes_read = 0;
uint64_t total_bytes_read = 0;
enum ast_websocket_opcode opcode;
int fragmented = 0;
*rtype = AST_AEAP_DATA_TYPE_NONE;
if (ast_websocket_fd(transport->ws) < 0) {
log_error(self, "unavailable for reading");
/* Ensure this transport is in a disconnected state */
aeap_transport_disconnect(self);
return -1;
}
/*
* This function is called with the read_lock locked. However, the lock needs to be
* unlocked while waiting for input otherwise a deadlock can occur during disconnect
* (disconnect attempts to grab the lock but can't because read holds it here). So
* unlock it prior to waiting.
*/
ast_mutex_unlock(&transport->base.read_lock);
if (ast_websocket_wait_for_input(transport->ws, -1) <= 0) {
ast_mutex_lock(&transport->base.read_lock);
log_error(self, "poll failure: %s", strerror(errno));
/* Ensure this transport is in a disconnected state */
aeap_transport_disconnect(self);
return -1;
}
ast_mutex_lock(&transport->base.read_lock);
if (!transport->ws) {
/*
* It's possible the transport was told to disconnect while waiting for input.
* If so then the websocket will be NULL, so we don't want to continue.
*/
return 0;
}
do {
if (ast_websocket_read(transport->ws, &payload, &bytes_read, &opcode,
&fragmented) != 0) {
log_error(self, "read failure (%d): %s", opcode, strerror(errno));
return -1;
}
if (!bytes_read) {
continue;
}
if (total_bytes_read + bytes_read > size) {
log_error(self, "attempted to read too many bytes into (%jd) sized buffer", size);
return -1;
}
memcpy(buf + total_bytes_read, payload, bytes_read);
total_bytes_read += bytes_read;
} while (opcode == AST_WEBSOCKET_OPCODE_CONTINUATION);
switch (opcode) {
case AST_WEBSOCKET_OPCODE_CLOSE:
log_error(self, "closed");
return -1;
case AST_WEBSOCKET_OPCODE_BINARY:
*rtype = AST_AEAP_DATA_TYPE_BINARY;
break;
case AST_WEBSOCKET_OPCODE_TEXT:
*rtype = AST_AEAP_DATA_TYPE_STRING;
/* Append terminator, but check for overflow first */
if (total_bytes_read == size) {
log_error(self, "unable to write string terminator");
return -1;
}
*((char *)(buf + total_bytes_read)) = '\0';
break;
default:
/* Ignore all other message types */
return 0;
}
return total_bytes_read;
}
static intmax_t websocket_write(struct aeap_transport *self, const void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE wtype)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
intmax_t res = 0;
switch (wtype) {
case AST_AEAP_DATA_TYPE_BINARY:
res = ast_websocket_write(transport->ws, AST_WEBSOCKET_OPCODE_BINARY,
(char *)buf, size);
break;
case AST_AEAP_DATA_TYPE_STRING:
res = ast_websocket_write(transport->ws, AST_WEBSOCKET_OPCODE_TEXT,
(char *)buf, size);
break;
default:
break;
}
if (res < 0) {
log_error(self, "problem writing to websocket (closed)");
/*
* If the underlying socket is closed then ensure the
* transport is in a disconnected state as well.
*/
aeap_transport_disconnect(self);
return res;
}
return size;
}
static struct aeap_transport_vtable *transport_websocket_vtable(void)
{
static struct aeap_transport_vtable websocket_vtable = {
.connect = websocket_connect,
.disconnect = websocket_disconnect,
.destroy = websocket_destroy,
.read = websocket_read,
.write = websocket_write,
};
return &websocket_vtable;
}
/*!
* \brief Initialize a transport websocket object, and set its virtual table
*
* \param transport The transport to initialize
*
* \returns 0 on success, -1 on error
*/
static int transport_websocket_init(struct aeap_transport_websocket *transport)
{
transport->ws = NULL;
((struct aeap_transport *)transport)->vtable = transport_websocket_vtable();
return 0;
}
struct aeap_transport_websocket *aeap_transport_websocket_create(void)
{
struct aeap_transport_websocket *transport;
transport = ast_calloc(1, sizeof(*transport));
if (!transport) {
ast_log(LOG_ERROR, "AEAP websocket: unable to create transport websocket");
return NULL;
}
if (transport_websocket_init(transport)) {
ast_free(transport);
return NULL;
}
return transport;
}

View File

@@ -0,0 +1,34 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef RES_AEAP_TRANSPORT_WEBSOCKET_H
#define RES_AEAP_TRANSPORT_WEBSOCKET_H
/*!
* \brief Asterisk external application protocol websocket transport
*/
struct aeap_transport_websocket;
/*!
* \brief Creates (heap allocated), and initializes a transport websocket
*
* \returns A transport websocket object, or NULL on error
*/
struct aeap_transport_websocket *aeap_transport_websocket_create(void);
#endif /* RES_AEAP_TRANSPORT_WEBSOCKET_H */

View File

@@ -42,7 +42,7 @@ static AST_RWLIST_HEAD_STATIC(engines, ast_speech_engine);
static struct ast_speech_engine *default_engine = NULL;
/*! \brief Find a speech recognition engine of specified name, if NULL then use the default one */
static struct ast_speech_engine *find_engine(const char *engine_name)
struct ast_speech_engine *ast_speech_find_engine(const char *engine_name)
{
struct ast_speech_engine *engine = NULL;
@@ -185,7 +185,7 @@ struct ast_speech *ast_speech_new(const char *engine_name, const struct ast_form
RAII_VAR(struct ast_format *, best, NULL, ao2_cleanup);
/* Try to find the speech recognition engine that was requested */
if (!(engine = find_engine(engine_name)))
if (!(engine = ast_speech_find_engine(engine_name)))
return NULL;
joint = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
@@ -313,7 +313,7 @@ int ast_speech_register(struct ast_speech_engine *engine)
}
/* If an engine is already loaded with this name, error out */
if (find_engine(engine->name)) {
if (ast_speech_find_engine(engine->name)) {
ast_log(LOG_WARNING, "Speech recognition engine '%s' already exists.\n", engine->name);
return -1;
}
@@ -366,6 +366,36 @@ struct ast_speech_engine *ast_speech_unregister2(const char *engine_name)
return engine;
}
void ast_speech_unregister_engines(
int (*should_unregister)(const struct ast_speech_engine *engine, void *data), void *data,
void (*on_unregistered)(void *obj))
{
struct ast_speech_engine *engine = NULL;
if (!should_unregister) {
return;
}
AST_RWLIST_WRLOCK(&engines);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(&engines, engine, list) {
if (should_unregister(engine, data)) {
/* We have our engine... removed it */
AST_RWLIST_REMOVE_CURRENT(list);
/* If this was the default engine, we need to pick a new one */
if (engine == default_engine) {
default_engine = AST_RWLIST_FIRST(&engines);
}
ast_verb(2, "Unregistered speech recognition engine '%s'\n", engine->name);
/* All went well */
if (on_unregistered) {
on_unregistered(engine);
}
}
}
AST_RWLIST_TRAVERSE_SAFE_END;
AST_RWLIST_UNLOCK(&engines);
}
static int unload_module(void)
{
/* We can not be unloaded */

731
res/res_speech_aeap.c Normal file
View File

@@ -0,0 +1,731 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief Asterisk External Application Speech Engine
*
*/
/*** MODULEINFO
<support_level>core</support_level>
***/
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/config.h"
#include "asterisk/format.h"
#include "asterisk/format_cap.h"
#include "asterisk/json.h"
#include "asterisk/module.h"
#include "asterisk/speech.h"
#include "asterisk/sorcery.h"
#include "asterisk/res_aeap.h"
#include "asterisk/res_aeap_message.h"
#define SPEECH_AEAP_VERSION "0.1.0"
#define SPEECH_PROTOCOL "speech_to_text"
#define CONNECTION_TIMEOUT 2000
#define log_error(obj, fmt, ...) \
ast_log(LOG_ERROR, "AEAP speech (%p): " fmt "\n", obj, ##__VA_ARGS__)
static struct ast_json *custom_fields_to_params(const struct ast_variable *variables)
{
const struct ast_variable *i;
struct ast_json *obj;
if (!variables) {
return NULL;
}
obj = ast_json_object_create();
if (!obj) {
return NULL;
}
for (i = variables; i; i = i->next) {
if (i->name[0] == '@' && i->name[1]) {
ast_json_object_set(obj, i->name + 1, ast_json_string_create(i->value));
}
}
return obj;
}
/*!
* \internal
* \brief Create, and send a request to the external application
*
* Create, then sends a request to an Asterisk external application, and then blocks
* until a response is received or a time out occurs. Since this method waits until
* receiving a response the returned result is guaranteed to be pass/fail based upon
* a response handler's result.
*
* \param name The name of the request to send
* \param json The core json request data
* \param data Optional user data to associate with request/response
*
* \returns 0 on success, -1 on error
*/
static int speech_aeap_send_request(struct ast_aeap *aeap, const char *name,
struct ast_json *json, void *obj)
{
/*
* Wait for a response. Also since we're blocking,
* data is expected to be on the stack so no cleanup required.
*/
struct ast_aeap_tsx_params tsx_params = {
.timeout = 1000,
.wait = 1,
.obj = obj,
};
/* "steals" the json ref */
tsx_params.msg = ast_aeap_message_create_request(
ast_aeap_message_type_json, name, NULL, json);
if (!tsx_params.msg) {
return -1;
}
/* Send "steals" the json msg ref */
return ast_aeap_send_msg_tsx(aeap, &tsx_params);
}
/*!
* \internal
* \brief Create, and send a "get" request to an external application
*
* Basic structure of the JSON message to send:
*
* { param: [<param>, ...] }
*
* \param speech The speech engine
* \param param The name of the parameter to retrieve
* \param data User data passed to the response handler
*
* \returns 0 on success, -1 on error
*/
static int speech_aeap_get(struct ast_speech *speech, const char *param, void *data)
{
if (!param) {
return -1;
}
/* send_request handles json ref */
return speech_aeap_send_request(speech->data,
"get", ast_json_pack("{s:[s]}", "params", param), data);
}
struct speech_param {
const char *name;
const char *value;
};
/*!
* \internal
* \brief Create, and send a "set" request to an external application
*
* Basic structure of the JSON message to send:
*
* { params: { <name> : <value> } }
*
* \param speech The speech engine
* \param name The name of the parameter to set
* \param value The value of the parameter to set
*
* \returns 0 on success, -1 on error
*/
static int speech_aeap_set(struct ast_speech *speech, const char *name, const char *value)
{
if (!name) {
return -1;
}
/* send_request handles json ref */
return speech_aeap_send_request(speech->data,
"set", ast_json_pack("{s:{s:s}}", "params", name, value), NULL);
}
static int handle_response_set(struct ast_aeap *aeap, struct ast_aeap_message *message, void *data)
{
return 0;
}
struct speech_setting {
const char *param;
size_t len;
char *buf;
};
static int handle_setting(struct ast_aeap *aeap, struct ast_json_iter *iter,
struct speech_setting *setting)
{
const char *value;
if (strcmp(ast_json_object_iter_key(iter), setting->param)) {
log_error(aeap, "Unable to 'get' speech setting for '%s'", setting->param);
return -1;
}
value = ast_json_string_get(ast_json_object_iter_value(iter));
if (!value) {
log_error(aeap, "No value for speech setting '%s'", setting->param);
return -1;
}
ast_copy_string(setting->buf, value, setting->len);
return 0;
}
static int handle_results(struct ast_aeap *aeap, struct ast_json_iter *iter,
struct ast_speech_result **speech_results)
{
struct ast_speech_result *result = NULL;
struct ast_json *json_results;
struct ast_json *json_result;
size_t i;
json_results = ast_json_object_iter_value(iter);
if (!json_results || !speech_results) {
log_error(aeap, "Unable to 'get' speech results");
return -1;
}
for (i = 0; i < ast_json_array_size(json_results); ++i) {
if (!(result = ast_calloc(1, sizeof(*result)))) {
continue;
}
json_result = ast_json_array_get(json_results, i);
result->text = ast_strdup(ast_json_object_string_get(json_result, "text"));
result->score = ast_json_object_integer_get(json_result, "score");
result->grammar = ast_strdup(ast_json_object_string_get(json_result, "grammar"));
result->nbest_num = ast_json_object_integer_get(json_result, "best");
if (*speech_results) {
AST_LIST_NEXT(result, list) = *speech_results;
*speech_results = result;
} else {
*speech_results = result;
}
}
return 0;
}
/*!
* \internal
* \brief Handle a "get" response from an external application
*
* Basic structure of the expected JSON message to received:
*
* {
* response: "get"
* "params" : { <name>: <value> | [ <results> ] }
* }
*
* \param speech The speech engine
* \param param The name of the parameter to retrieve
* \param data User data passed to the response handler
*
* \returns 0 on success, -1 on error
*/
static int handle_response_get(struct ast_aeap *aeap, struct ast_aeap_message *message, void *data)
{
struct ast_json_iter *iter;
iter = ast_json_object_iter(ast_json_object_get(ast_aeap_message_data(message), "params"));
if (!iter) {
log_error(aeap, "no 'get' parameters returned");
return -1;
}
if (!strcmp(ast_json_object_iter_key(iter), "results")) {
return handle_results(aeap, iter, data);
}
return handle_setting(aeap, iter, data);
}
static int handle_response_setup(struct ast_aeap *aeap, struct ast_aeap_message *message, void *data)
{
struct ast_format *format = data;
struct ast_json *json = ast_aeap_message_data(message);
const char *codec_name;
if (!json) {
log_error(aeap, "no 'setup' object returned");
return -1;
}
json = ast_json_object_get(json, "codecs");
if (!json || ast_json_array_size(json) == 0) {
log_error(aeap, "no 'setup' codecs available");
return -1;
}
codec_name = ast_json_object_string_get(ast_json_array_get(json, 0), "name");
if (!codec_name || strcmp(codec_name, ast_format_get_codec_name(format))) {
log_error(aeap, "setup codec '%s' unsupported", ast_format_get_codec_name(format));
return -1;
}
return 0;
}
static const struct ast_aeap_message_handler response_handlers[] = {
{ "setup", handle_response_setup },
{ "get", handle_response_get },
{ "set", handle_response_set },
};
static int handle_request_set(struct ast_aeap *aeap, struct ast_aeap_message *message, void *data)
{
struct ast_json_iter *iter;
const char *error_msg = NULL;
iter = ast_json_object_iter(ast_json_object_get(ast_aeap_message_data(message), "params"));
if (!iter) {
error_msg = "no parameter(s) requested";
} else if (!strcmp(ast_json_object_iter_key(iter), "results")) {
struct ast_speech *speech = ast_aeap_user_data_object_by_id(aeap, "speech");
if (!speech) {
error_msg = "no associated speech object";
} else if (handle_results(aeap, iter, &speech->results)) {
error_msg = "unable to handle results";
} else {
ast_speech_change_state(speech, AST_SPEECH_STATE_DONE);
}
} else {
error_msg = "can only set 'results'";
}
if (error_msg) {
log_error(aeap, "set - %s", error_msg);
message = ast_aeap_message_create_error(ast_aeap_message_type_json,
ast_aeap_message_name(message), ast_aeap_message_id(message), error_msg);
} else {
message = ast_aeap_message_create_response(ast_aeap_message_type_json,
ast_aeap_message_name(message), ast_aeap_message_id(message), NULL);
}
ast_aeap_send_msg(aeap, message);
return 0;
}
static const struct ast_aeap_message_handler request_handlers[] = {
{ "set", handle_request_set },
};
static struct ast_aeap_params speech_aeap_params = {
.response_handlers = response_handlers,
.response_handlers_size = ARRAY_LEN(response_handlers),
.request_handlers = request_handlers,
.request_handlers_size = ARRAY_LEN(request_handlers),
};
/*!
* \internal
* \brief Create, and connect to an external application and send initial setup
*
* Basic structure of the JSON message to send:
*
* {
* "request": "setup"
* "codecs": [
* {
* "name": <name>,
* "attributes": { <name>: <value>, ..., }
* },
* ...,
* ],
* "params": { <name>: <value>, ..., }
* }
*
* \param speech The speech engine
* \param format The format codec to use
*
* \returns 0 on success, -1 on error
*/
static int speech_aeap_engine_create(struct ast_speech *speech, struct ast_format *format)
{
struct ast_aeap *aeap;
struct ast_variable *vars;
struct ast_json *json;
aeap = ast_aeap_create_and_connect_by_id(
speech->engine->name, &speech_aeap_params, CONNECTION_TIMEOUT);
if (!aeap) {
return -1;
}
speech->data = aeap;
/* Don't allow unloading of this module while an external application is in use */
ast_module_ref(ast_module_info->self);
vars = ast_aeap_custom_fields_get(speech->engine->name);
/* While the protocol allows sending of codec attributes, for now don't */
json = ast_json_pack("{s:s,s:[{s:s}],s:o*}", "version", SPEECH_AEAP_VERSION, "codecs",
"name", ast_format_get_codec_name(format), "params", custom_fields_to_params(vars));
ast_variables_destroy(vars);
if (ast_aeap_user_data_register(aeap, "speech", speech, NULL)) {
ast_module_unref(ast_module_info->self);
return -1;
}
/* send_request handles json ref */
if (speech_aeap_send_request(speech->data, "setup", json, format)) {
ast_module_unref(ast_module_info->self);
return -1;
}
/*
* Add a reference to the engine here, so if it happens to get unregistered
* while executing it won't disappear.
*/
ao2_ref(speech->engine, 1);
return 0;
}
static int speech_aeap_engine_destroy(struct ast_speech *speech)
{
ao2_ref(speech->engine, -1);
ao2_cleanup(speech->data);
ast_module_unref(ast_module_info->self);
return 0;
}
static int speech_aeap_engine_write(struct ast_speech *speech, void *data, int len)
{
return ast_aeap_send_binary(speech->data, data, len);
}
static int speech_aeap_engine_dtmf(struct ast_speech *speech, const char *dtmf)
{
return speech_aeap_set(speech, "dtmf", dtmf);
}
static int speech_aeap_engine_start(struct ast_speech *speech)
{
ast_speech_change_state(speech, AST_SPEECH_STATE_READY);
return 0;
}
static int speech_aeap_engine_change(struct ast_speech *speech, const char *name, const char *value)
{
return speech_aeap_set(speech, name, value);
}
static int speech_aeap_engine_get_setting(struct ast_speech *speech, const char *name,
char *buf, size_t len)
{
struct speech_setting setting = {
.param = name,
.len = len,
.buf = buf,
};
return speech_aeap_get(speech, name, &setting);
}
static int speech_aeap_engine_change_results_type(struct ast_speech *speech,
enum ast_speech_results_type results_type)
{
return speech_aeap_set(speech, "results_type",
ast_speech_results_type_to_string(results_type));
}
static struct ast_speech_result *speech_aeap_engine_get(struct ast_speech *speech)
{
struct ast_speech_result *results = NULL;
if (speech->results) {
return speech->results;
}
if (speech_aeap_get(speech, "results", &results)) {
return NULL;
}
return results;
}
static void speech_engine_destroy(void *obj)
{
struct ast_speech_engine *engine = obj;
ao2_cleanup(engine->formats);
ast_free(engine->name);
}
static struct ast_speech_engine *speech_engine_alloc(const char *name)
{
struct ast_speech_engine *engine;
engine = ao2_t_alloc_options(sizeof(*engine), speech_engine_destroy,
AO2_ALLOC_OPT_LOCK_NOLOCK, name);
if (!engine) {
ast_log(LOG_ERROR, "AEAP speech: unable create engine '%s'\n", name);
return NULL;
}
engine->name = ast_strdup(name);
if (!engine->name) {
ao2_ref(engine, -1);
return NULL;
}
engine->create = speech_aeap_engine_create;
engine->destroy = speech_aeap_engine_destroy;
engine->write = speech_aeap_engine_write;
engine->dtmf = speech_aeap_engine_dtmf;
engine->start = speech_aeap_engine_start;
engine->change = speech_aeap_engine_change;
engine->get_setting = speech_aeap_engine_get_setting;
engine->change_results_type = speech_aeap_engine_change_results_type;
engine->get = speech_aeap_engine_get;
engine->formats = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
return engine;
}
static void speech_engine_alloc_and_register(const char *name, const struct ast_format_cap *formats)
{
struct ast_speech_engine *engine;
engine = speech_engine_alloc(name);
if (!engine) {
return;
}
if (formats && ast_format_cap_append_from_cap(engine->formats,
formats, AST_MEDIA_TYPE_AUDIO)) {
ast_log(LOG_WARNING, "AEAP speech: Unable to add engine '%s' formats\n", name);
ao2_ref(engine, -1);
return;
}
if (ast_speech_register(engine)) {
ast_log(LOG_WARNING, "AEAP speech: Unable to register engine '%s'\n", name);
ao2_ref(engine, -1);
}
}
#ifdef TEST_FRAMEWORK
static void speech_engine_alloc_and_register2(const char *name, const char *codec_names)
{
struct ast_speech_engine *engine;
engine = speech_engine_alloc(name);
if (!engine) {
return;
}
if (codec_names && ast_format_cap_update_by_allow_disallow(engine->formats, codec_names, 1)) {
ast_log(LOG_WARNING, "AEAP speech: Unable to add engine '%s' codecs\n", name);
ao2_ref(engine, -1);
return;
}
if (ast_speech_register(engine)) {
ast_log(LOG_WARNING, "AEAP speech: Unable to register engine '%s'\n", name);
ao2_ref(engine, -1);
}
}
#endif
static int unload_engine(void *obj, void *arg, int flags)
{
if (ast_aeap_client_config_has_protocol(obj, SPEECH_PROTOCOL)) {
ao2_cleanup(ast_speech_unregister2(ast_sorcery_object_get_id(obj)));
}
return 0;
}
static int load_engine(void *obj, void *arg, int flags)
{
const char *id;
const struct ast_format_cap *formats;
const struct ast_speech_engine *engine;
if (!ast_aeap_client_config_has_protocol(obj, SPEECH_PROTOCOL)) {
return 0;
}
id = ast_sorcery_object_get_id(obj);
formats = ast_aeap_client_config_codecs(obj);
if (!formats) {
formats = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
if (!formats) {
ast_log(LOG_ERROR, "AEAP speech: unable to allocate default engine format for '%s'\n", id);
return 0;
}
}
engine = ast_speech_find_engine(id);
if (!engine) {
speech_engine_alloc_and_register(id, formats);
return 0;
}
if (ast_format_cap_identical(formats, engine->formats)) {
/* Same name, same formats then nothing changed */
return 0;
}
ao2_ref(ast_speech_unregister2(engine->name), -1);
speech_engine_alloc_and_register(id, formats);
return 0;
}
static int matches_engine(void *obj, void *arg, int flags)
{
const struct ast_speech_engine *engine = arg;
return strcmp(ast_sorcery_object_get_id(obj), engine->name) ? 0 : CMP_MATCH;
}
static int should_unregister(const struct ast_speech_engine *engine, void *data)
{
void *obj;
if (engine->create != speech_aeap_engine_create) {
/* Only want to potentially unregister AEAP speech engines */
return 0;
}
#ifdef TEST_FRAMEWORK
if (!strcmp("_aeap_test_speech_", engine->name)) {
/* Don't remove the test engine */
return 0;
}
#endif
obj = ao2_callback(data, 0, matches_engine, (void*)engine);
if (obj) {
ao2_ref(obj, -1);
return 0;
}
/* If no match in given container then unregister engine */
return 1;
}
static void speech_observer_loaded(const char *object_type)
{
struct ao2_container *container;
if (strcmp(object_type, AEAP_CONFIG_CLIENT)) {
return;
}
container = ast_aeap_client_configs_get(SPEECH_PROTOCOL);
if (!container) {
return;
}
/*
* An AEAP module reload has occurred. First
* remove all engines that no longer exist.
*/
ast_speech_unregister_engines(should_unregister, container, __ao2_cleanup);
/* Now add or update engines */
ao2_callback(container, 0, load_engine, NULL);
ao2_ref(container, -1);
}
/*! \brief Observer for AEAP reloads */
static const struct ast_sorcery_observer speech_observer = {
.loaded = speech_observer_loaded,
};
static int unload_module(void)
{
struct ao2_container *container;
#ifdef TEST_FRAMEWORK
ao2_cleanup(ast_speech_unregister2("_aeap_test_speech_"));
#endif
ast_sorcery_observer_remove(ast_aeap_sorcery(), AEAP_CONFIG_CLIENT, &speech_observer);
container = ast_aeap_client_configs_get(SPEECH_PROTOCOL);
if (container) {
ao2_callback(container, 0, unload_engine, NULL);
ao2_ref(container, -1);
}
return 0;
}
static int load_module(void)
{
struct ao2_container *container;
speech_aeap_params.msg_type = ast_aeap_message_type_json;
container = ast_aeap_client_configs_get(SPEECH_PROTOCOL);
if (container) {
ao2_callback(container, 0, load_engine, NULL);
ao2_ref(container, -1);
}
/*
* Add an observer since a named speech server must be created,
* registered, and eventually removed for all AEAP client
* configuration matching the "speech_to_text" protocol.
*/
if (ast_sorcery_observer_add(ast_aeap_sorcery(), AEAP_CONFIG_CLIENT, &speech_observer)) {
return AST_MODULE_LOAD_DECLINE;
}
#ifdef TEST_FRAMEWORK
speech_engine_alloc_and_register2("_aeap_test_speech_", "ulaw");
#endif
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Asterisk External Application Speech Engine",
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module,
.load_pri = AST_MODPRI_CHANNEL_DEPEND,
.requires = "res_speech,res_aeap",
);