ARI: WebSocket event cleanup

Stasis events (which get distributed over the ARI WebSocket) are created
by subscribing to the channel_all_cached and bridge_all_cached topics,
filtering out events for channels/bridges currently subscribed to.

There are two issues with that. First was a race condition, where
messages in-flight to the master subscribe-to-all-things topic would get
sent out, even though the events happened before the channel was put
into Stasis. Secondly, as the number of channels and bridges grow in the
system, the work spent filtering messages becomes excessive.

Since r395954, individual channels and bridges have caching topics, and
can be subscribed to individually. This patch takes advantage, so that
channels and bridges are subscribed to on demand, instead of filtering
the global topics.

The one case where filtering is still required is handling BridgeMerge
messages, which are published directly to the bridge_all topic.

Other than the change to how subscriptions work, this patch mostly just
moves code around. Most of the work generating JSON objects from
messages was moved to .to_json handlers on the message types. The
callback functions handling app subscriptions were moved from res_stasis
(b/c they were global to the model) to stasis/app.c (b/c they are local
to the app now).

(closes issue ASTERISK-21969)
Reported by: Matt Jordan
Review: https://reviewboard.asterisk.org/r/2754/
........

Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
David M. Lee
2013-08-27 19:19:36 +00:00
parent 3540c7ac6e
commit 451993f4f5
9 changed files with 785 additions and 722 deletions

View File

@@ -86,6 +86,12 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
*/
#define CONTROLS_NUM_BUCKETS 127
/*!
* \brief Number of buckets for the Stasis bridges hash table. Remember to
* keep it a prime number!
*/
#define BRIDGES_NUM_BUCKETS 127
/*!
* \brief Stasis application container.
*/
@@ -97,12 +103,6 @@ struct ao2_container *app_bridges;
struct ao2_container *app_bridges_moh;
/*! \brief Message router for the channel caching topic */
struct stasis_message_router *channel_router;
/*! \brief Message router for the bridge caching topic */
struct stasis_message_router *bridge_router;
/*! AO2 hash function for \ref app */
static int app_hash(const void *obj, const int flags)
{
@@ -153,6 +153,30 @@ static int control_compare(void *lhs, void *rhs, int flags)
}
}
static int cleanup_cb(void *obj, void *arg, int flags)
{
struct app *app = obj;
if (!app_is_finished(app)) {
return 0;
}
ast_verb(1, "Shutting down application '%s'\n", app_name(app));
app_shutdown(app);
return CMP_MATCH;
}
/*!
* \brief Clean up any old apps that we don't need any more.
*/
static void cleanup(void)
{
ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
cleanup_cb, NULL);
}
struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
{
return control_create(chan);
@@ -435,229 +459,6 @@ struct ast_bridge *stasis_app_bridge_find_by_id(
return ao2_find(app_bridges, bridge_id, OBJ_KEY);
}
/*! \brief Typedef for blob handler callbacks */
typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
/*! \brief Callback to check whether an app is watching a given channel */
static int app_watching_channel_cb(void *obj, void *arg, int flags)
{
struct app *app = obj;
char *uniqueid = arg;
return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
}
/*! \brief Get a container full of apps that are interested in the specified channel */
static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
{
struct ao2_container *watching_apps;
char *uniqueid_dup;
RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
ast_assert(uniqueid != NULL);
uniqueid_dup = ast_strdupa(uniqueid);
watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
watching_apps = watching_apps_iter->c;
if (!ao2_container_count(watching_apps)) {
return NULL;
}
ao2_ref(watching_apps, +1);
return watching_apps_iter->c;
}
/*! \brief Typedef for callbacks that get called on channel snapshot updates */
typedef struct ast_json *(*channel_snapshot_monitor)(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot,
const struct timeval *tv);
static struct ast_json *simple_channel_event(
const char *type,
struct ast_channel_snapshot *snapshot,
const struct timeval *tv)
{
return ast_json_pack("{s: s, s: o, s: o}",
"type", type,
"timestamp", ast_json_timeval(*tv, NULL),
"channel", ast_channel_snapshot_to_json(snapshot));
}
static struct ast_json *channel_created_event(
struct ast_channel_snapshot *snapshot,
const struct timeval *tv)
{
return simple_channel_event("ChannelCreated", snapshot, tv);
}
static struct ast_json *channel_destroyed_event(
struct ast_channel_snapshot *snapshot,
const struct timeval *tv)
{
return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
"type", "ChannelDestroyed",
"timestamp", ast_json_timeval(*tv, NULL),
"cause", snapshot->hangupcause,
"cause_txt", ast_cause2str(snapshot->hangupcause),
"channel", ast_channel_snapshot_to_json(snapshot));
}
static struct ast_json *channel_state_change_event(
struct ast_channel_snapshot *snapshot,
const struct timeval *tv)
{
return simple_channel_event("ChannelStateChange", snapshot, tv);
}
/*! \brief Handle channel state changes */
static struct ast_json *channel_state(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot,
const struct timeval *tv)
{
struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
if (!old_snapshot) {
return channel_created_event(snapshot, tv);
} else if (!new_snapshot) {
return channel_destroyed_event(snapshot, tv);
} else if (old_snapshot->state != new_snapshot->state) {
return channel_state_change_event(snapshot, tv);
}
return NULL;
}
static struct ast_json *channel_dialplan(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot,
const struct timeval *tv)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
/* No Newexten event on cache clear */
if (!new_snapshot) {
return NULL;
}
/* Empty application is not valid for a Newexten event */
if (ast_strlen_zero(new_snapshot->appl)) {
return NULL;
}
if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
return NULL;
}
return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
"type", "ChannelDialplan",
"timestamp", ast_json_timeval(*tv, NULL),
"dialplan_app", new_snapshot->appl,
"dialplan_app_data", new_snapshot->data,
"channel", ast_channel_snapshot_to_json(new_snapshot));
}
static struct ast_json *channel_callerid(
struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot,
const struct timeval *tv)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
/* No NewCallerid event on cache clear or first event */
if (!old_snapshot || !new_snapshot) {
return NULL;
}
if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
return NULL;
}
return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
"type", "ChannelCallerId",
"timestamp", ast_json_timeval(*tv, NULL),
"caller_presentation", new_snapshot->caller_pres,
"caller_presentation_txt", ast_describe_caller_presentation(
new_snapshot->caller_pres),
"channel", ast_channel_snapshot_to_json(new_snapshot));
}
channel_snapshot_monitor channel_monitors[] = {
channel_state,
channel_dialplan,
channel_callerid
};
static int app_send_cb(void *obj, void *arg, int flags)
{
struct app *app = obj;
struct ast_json *msg = arg;
app_send(app, msg);
return 0;
}
static void sub_channel_snapshot_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
/* Pull timestamp from the new snapshot, or from the update message
* when there isn't one. */
const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
int i;
watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
if (!watching_apps) {
return;
}
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
if (msg) {
ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
}
}
static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
{
ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
}
static void sub_channel_blob_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
struct ast_channel_blob *obj = stasis_message_data(message);
if (!obj->snapshot) {
return;
}
msg = stasis_message_to_json(message);
if (!msg) {
return;
}
watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
if (!watching_apps) {
return;
}
distribute_message(watching_apps, msg);
}
/*!
* \brief In addition to running ao2_cleanup(), this function also removes the
@@ -709,7 +510,7 @@ void stasis_app_bridge_destroy(const char *bridge_id)
ast_bridge_destroy(bridge, 0);
}
int app_send_start_msg(struct app *app, struct ast_channel *chan,
static int send_start_msg(struct app *app, struct ast_channel *chan,
int argc, char *argv[])
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
@@ -726,8 +527,9 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
return -1;
}
msg = ast_json_pack("{s: s, s: [], s: o}",
msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
"type", "StasisStart",
"timestamp", ast_json_timeval(ast_tvnow(), NULL),
"args",
"channel", ast_channel_snapshot_to_json(snapshot));
if (!msg) {
@@ -750,7 +552,7 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
return 0;
}
int app_send_end_msg(struct app *app, struct ast_channel *chan)
static int send_end_msg(struct app *app, struct ast_channel *chan)
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -763,8 +565,9 @@ int app_send_end_msg(struct app *app, struct ast_channel *chan)
return -1;
}
msg = ast_json_pack("{s: s, s: o}",
msg = ast_json_pack("{s: s, s: o, s: o}",
"type", "StasisEnd",
"timestamp", ast_json_timeval(ast_tvnow(), NULL),
"channel", ast_channel_snapshot_to_json(snapshot));
if (!msg) {
return -1;
@@ -815,15 +618,17 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
}
ao2_link(app_controls, control);
res = app_send_start_msg(app, chan, argc, argv);
res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending start message to '%s'\n", app_name);
return res;
return -1;
}
if (app_add_channel(app, chan)) {
ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
res = app_subscribe_channel(app, chan);
if (res != 0) {
ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
app_name, ast_channel_name(chan));
return -1;
}
@@ -831,13 +636,23 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
int r;
int command_count;
struct ast_bridge *last_bridge = NULL;
struct ast_bridge *bridge = NULL;
/* Check to see if a bridge absorbed our hangup frame */
if (ast_check_hangup_locked(chan)) {
break;
}
if (stasis_app_get_bridge(control)) {
last_bridge = bridge;
bridge = stasis_app_get_bridge(control);
if (bridge != last_bridge) {
app_unsubscribe_bridge(app, last_bridge);
app_subscribe_bridge(app, bridge);
}
if (bridge) {
/* Bridge is handling channel frames */
control_wait(control);
control_dispatch_all(control, chan);
@@ -882,14 +697,21 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
}
}
app_remove_channel(app, chan);
res = app_send_end_msg(app, chan);
app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
app_unsubscribe_channel(app, chan);
res = send_end_msg(app, chan);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending end message to %s\n", app_name);
return res;
}
/* There's an off chance that app is ready for cleanup. Go ahead
* and clean up, just in case
*/
cleanup();
return res;
}
@@ -912,29 +734,6 @@ int stasis_app_send(const char *app_name, struct ast_json *message)
return 0;
}
static int cleanup_cb(void *obj, void *arg, int flags)
{
struct app *app = obj;
if (!app_is_finished(app)) {
return 0;
}
ast_verb(1, "Cleaning up application '%s'\n", app_name(app));
return CMP_MATCH;
}
/*!
* \brief Clean up any old apps that we don't need any more.
*/
static void cleanup(void)
{
ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
cleanup_cb, NULL);
}
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@@ -994,249 +793,22 @@ void stasis_app_unref(void)
ast_module_unref(ast_module_info->self);
}
/*! \brief Callback to check whether an app is watching a given bridge */
static int app_watching_bridge_cb(void *obj, void *arg, int flags)
{
struct app *app = obj;
char *uniqueid = arg;
return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
}
/*! \brief Get a container full of apps that are interested in the specified bridge */
static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
{
struct ao2_container *watching_apps;
char *uniqueid_dup;
RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
ast_assert(uniqueid != NULL);
uniqueid_dup = ast_strdupa(uniqueid);
watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup);
watching_apps = watching_apps_iter->c;
if (!ao2_container_count(watching_apps)) {
return NULL;
}
ao2_ref(watching_apps, +1);
return watching_apps_iter->c;
}
/*! Callback used to remove an app's interest in a bridge */
static int remove_bridge_cb(void *obj, void *arg, int flags)
{
app_remove_bridge(obj, arg);
return 0;
}
static struct ast_json *simple_bridge_event(
const char *type,
struct ast_bridge_snapshot *snapshot,
const struct timeval *tv)
{
return ast_json_pack("{s: s, s: o, s: o}",
"type", type,
"timestamp", ast_json_timeval(*tv, NULL),
"bridge", ast_bridge_snapshot_to_json(snapshot));
}
static struct ast_json *simple_bridge_channel_event(
const char *type,
struct ast_bridge_snapshot *bridge_snapshot,
struct ast_channel_snapshot *channel_snapshot,
const struct timeval *tv)
{
return ast_json_pack("{s: s, s: o, s: o, s: o}",
"type", type,
"timestamp", ast_json_timeval(*tv, NULL),
"bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
"channel", ast_channel_snapshot_to_json(channel_snapshot));
}
static void sub_bridge_snapshot_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
if (!watching_apps || !ao2_container_count(watching_apps)) {
return;
}
if (!new_snapshot) {
RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free);
/* The bridge has gone away. Create the message, make sure no apps are
* watching this bridge anymore, and destroy the bridge's control
* structure */
msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
stasis_app_bridge_destroy(old_snapshot->uniqueid);
} else if (!old_snapshot) {
msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
}
if (!msg) {
return;
}
distribute_message(watching_apps, msg);
}
/*! \brief Callback used to merge two containers of applications */
static int list_merge_cb(void *obj, void *arg, int flags)
{
/* remove any current entries for this app */
ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
/* relink as the only entry */
ao2_link(arg, obj);
return 0;
}
/*! \brief Merge container src into container dst without modifying src */
static void update_apps_list(struct ao2_container *dst, struct ao2_container *src)
{
ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
}
/*! \brief Callback for adding to an app's bridges of interest */
static int app_add_bridge_cb(void *obj, void *arg, int flags)
{
app_add_bridge(obj, arg);
return 0;
}
/*! \brief Add interest in the given bridge to all apps in the container */
static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id)
{
RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
}
static void sub_bridge_merge_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
struct ast_bridge_merge_message *merge = stasis_message_data(message);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
const struct timeval *tv = stasis_message_timestamp(message);
watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
if (watching_apps_to) {
update_apps_list(watching_apps_all, watching_apps_to);
}
watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
if (watching_apps_from) {
update_bridge_interest(watching_apps_from, merge->to->uniqueid);
update_apps_list(watching_apps_all, watching_apps_from);
}
if (!ao2_container_count(watching_apps_all)) {
return;
}
msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
"type", "BridgeMerged",
"timestamp", ast_json_timeval(*tv, NULL),
"bridge", ast_bridge_snapshot_to_json(merge->to),
"bridge_from", ast_bridge_snapshot_to_json(merge->from));
if (!msg) {
return;
}
distribute_message(watching_apps_all, msg);
}
static void sub_bridge_enter_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
struct ast_bridge_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
if (watching_apps_bridge) {
update_apps_list(watching_apps_all, watching_apps_bridge);
}
watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid);
if (watching_apps_channel) {
update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid);
update_apps_list(watching_apps_all, watching_apps_channel);
}
if (!ao2_container_count(watching_apps_all)) {
return;
}
msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
obj->channel, stasis_message_timestamp(message));
distribute_message(watching_apps_all, msg);
}
static void sub_bridge_leave_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
struct ast_bridge_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
if (!watching_apps_bridge) {
return;
}
msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
obj->channel, stasis_message_timestamp(message));
distribute_message(watching_apps_bridge, msg);
}
static int load_module(void)
{
int r = 0;
apps_registry =
ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
app_compare);
if (apps_registry == NULL) {
return AST_MODULE_LOAD_FAILURE;
}
app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
control_hash, control_compare);
app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
control_compare);
if (app_controls == NULL) {
return AST_MODULE_LOAD_FAILURE;
}
app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
bridges_hash, bridges_compare);
if (app_bridges == NULL) {
return AST_MODULE_LOAD_FAILURE;
}
app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
bridges_compare);
app_bridges_moh = ao2_container_alloc_hash(
AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
@@ -1246,52 +818,11 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
channel_router = stasis_message_router_create(ast_channel_topic_all_cached());
if (!channel_router) {
return AST_MODULE_LOAD_FAILURE;
}
r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
/* TODO: This could be handled a lot better. Instead of subscribing to
* the one caching topic and filtering out messages by channel id, we
* should have individual caching topics per-channel, with a shared
* back-end cache. That would simplify a lot of what's going on right
* here.
*/
r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
if (r) {
return AST_MODULE_LOAD_FAILURE;
}
bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached());
if (!bridge_router) {
return AST_MODULE_LOAD_FAILURE;
}
r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
if (r) {
return AST_MODULE_LOAD_FAILURE;
}
return AST_MODULE_LOAD_SUCCESS;
}
static int unload_module(void)
{
int r = 0;
stasis_message_router_unsubscribe_and_join(channel_router);
channel_router = NULL;
stasis_message_router_unsubscribe_and_join(bridge_router);
bridge_router = NULL;
ao2_cleanup(apps_registry);
apps_registry = NULL;
@@ -1304,7 +835,7 @@ static int unload_module(void)
ao2_cleanup(app_bridges_moh);
app_bridges_moh = NULL;
return r;
return 0;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",