Stasis: Fix StasisStart/End order and missing events

This corrects several bugs that currently exist in the stasis
application code.

* After a masquerade, the resulting channels have channel topics that
  do not match their uniqueids
** Masquerades now swap channel topics appropriately
* StasisStart and StasisEnd messages are leaked to observer
  applications due to being published on channel topics
** StasisStart and StasisEnd publishing is now properly restricted
   to controlling apps via app topics
* Race conditions exist where StasisStart and StasisEnd messages due to
  a masquerade may be received out of order due to being published on
  different topics
** These messages are now published directly on the app topic so this
   is now a non-issue
* StasisEnds are sometimes missing when sent due to masquerades and
  bridge swaps into and out of Stasis()
** This was due to StasisEnd processing adjusting message-sent flags
   after Stasis() had already exited and Stasis() had been re-entered
** This was corrected by adjusting these flags prior to sending the
   message while the initial Stasis() application was still shutting
   down

Review: https://reviewboard.asterisk.org/r/4213/
ASTERISK-24537 #close
Reported by: Matt DiMeo
........

Merged revisions 429061 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/13@429062 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Kinsey Moore
2014-12-08 15:43:14 +00:00
parent 7fd1227202
commit 55c9a46abd
7 changed files with 47 additions and 95 deletions

View File

@@ -125,7 +125,7 @@ static struct ast_json *stasis_end_to_json(struct stasis_message *message,
"channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize));
}
STASIS_MESSAGE_TYPE_DEFN(app_end_message_type,
STASIS_MESSAGE_TYPE_DEFN_LOCAL(end_message_type,
.to_json = stasis_end_to_json);
struct start_message_blob {
@@ -919,6 +919,12 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
struct stasis_message *msg;
int i;
if (app_subscribe_channel(app, chan)) {
ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
app_name(app), ast_channel_name(chan));
return -1;
}
payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor);
if (!payload) {
ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
@@ -928,7 +934,8 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
payload->channel = ao2_bump(snapshot);
payload->replace_channel = ao2_bump(replace_channel_snapshot);
json_blob = ast_json_pack("{s: o, s: []}",
json_blob = ast_json_pack("{s: s, s: o, s: []}",
"app", app_name(app),
"timestamp", ast_json_timeval(ast_tvnow(), NULL),
"args");
if (!json_blob) {
@@ -956,7 +963,10 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
return -1;
}
stasis_publish(ast_channel_topic(chan), msg);
if (replace_channel_snapshot) {
app_unsubscribe_channel_id(app, replace_channel_snapshot->uniqueid);
}
stasis_publish(ast_app_get_topic(app), msg);
ao2_ref(msg, -1);
return 0;
}
@@ -988,6 +998,7 @@ int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
{
struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
struct ast_json *blob;
struct stasis_message *msg;
if (sanitize && sanitize->channel
&& sanitize->channel(chan)) {
@@ -1000,10 +1011,13 @@ int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
return -1;
}
stasis_app_channel_set_stasis_end_published(chan);
remove_masquerade_store(chan);
ast_channel_publish_blob(chan, app_end_message_type(), blob);
app_unsubscribe_channel(app, chan);
msg = ast_channel_blob_create(chan, end_message_type(), blob);
if (msg) {
stasis_publish(ast_app_get_topic(app), msg);
}
ao2_cleanup(msg);
ast_json_unref(blob);
return 0;
@@ -1034,6 +1048,7 @@ static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct a
}
/* send the StasisEnd message to the app */
stasis_app_channel_set_stasis_end_published(new_chan);
app_send_end_msg(control_app(control), new_chan);
/* remove the datastore */
@@ -1083,11 +1098,6 @@ static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct
/* send the StasisEnd message to the app */
app_send_end_msg(control_app(control), old_chan);
/* fixup channel topic forwards */
if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) {
ast_log(LOG_ERROR, "Failed to fixup channel topic forwards for %s(%s) owned by %s\n",
old_snapshot->name, old_snapshot->uniqueid, app_name(control_app(control)));
}
ao2_cleanup(control);
}
@@ -1251,14 +1261,6 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
return -1;
}
res = app_subscribe_channel(app, chan);
if (res != 0) {
ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
app_name, ast_channel_name(chan));
remove_masquerade_store(chan);
return -1;
}
res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR,
@@ -1894,7 +1896,7 @@ static int unload_module(void)
ao2_cleanup(app_bridges_playback);
app_bridges_playback = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(app_end_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(end_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(start_message_type);
return 0;
@@ -1938,28 +1940,6 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
return &app_sanitizer;
}
void app_end_message_handler(struct stasis_message *message)
{
struct ast_channel_blob *payload;
struct ast_channel_snapshot *snapshot;
const char *app_name;
char *channel_uri;
size_t alloc_size;
const char *channels[1];
payload = stasis_message_data(message);
snapshot = payload->snapshot;
app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app"));
/* +8 is for the length of "channel:" */
alloc_size = AST_MAX_UNIQUEID + 8;
channel_uri = ast_alloca(alloc_size);
snprintf(channel_uri, alloc_size, "channel:%s", snapshot->uniqueid);
channels[0] = channel_uri;
stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL);
}
static const struct ast_datastore_info stasis_internal_channel_info = {
.type = "stasis-internal-channel",
};
@@ -2033,7 +2013,7 @@ static int load_module(void)
if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) {
return AST_MODULE_LOAD_DECLINE;
}
if (STASIS_MESSAGE_TYPE_INIT(app_end_message_type) != 0) {
if (STASIS_MESSAGE_TYPE_INIT(end_message_type) != 0) {
return AST_MODULE_LOAD_DECLINE;
}
apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);