Merge "stasis / manager / ari: Better filter messages."

This commit is contained in:
Joshua C. Colp
2019-01-22 18:58:48 -06:00
committed by Gerrit Code Review
5 changed files with 68 additions and 16 deletions

View File

@@ -228,7 +228,7 @@ void stasis_message_router_remove_cache_update(
* \brief Sets the default route of a router. * \brief Sets the default route of a router.
* *
* \param router Router to set the default route of. * \param router Router to set the default route of.
* \param callback Callback to forard messages which otherwise have no home. * \param callback Callback to forward messages which otherwise have no home.
* \param data Data pointer to pass to \a callback. * \param data Data pointer to pass to \a callback.
* *
* \retval 0 on success * \retval 0 on success
@@ -244,6 +244,27 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback, stasis_subscription_cb callback,
void *data); void *data);
/*!
* \brief Sets the default route of a router with formatters.
*
* \param router Router to set the default route of.
* \param callback Callback to forward messages which otherwise have no home.
* \param data Data pointer to pass to \a callback.
* \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
*
* \since 13.26.0
* \since 16.3.0
*
* \note If formatters are specified then the message router will remain in a selective
* filtering state. Any explicit routes will receive messages of their message type and
* the default callback will only receive messages that have one of the given formatters.
* Explicit routes will not be filtered according to the given formatters.
*/
void stasis_message_router_set_formatters_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data,
enum stasis_subscription_message_formatters formatters);
/*! /*!
* \brief Indicate to a message router that we are interested in messages with one or more formatters. * \brief Indicate to a message router that we are interested in messages with one or more formatters.
* *

View File

@@ -8899,8 +8899,8 @@ static int manager_subscriptions_init(void)
stasis_message_router_set_congestion_limits(stasis_router, -1, stasis_message_router_set_congestion_limits(stasis_router, -1,
6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
res |= stasis_message_router_set_default(stasis_router, stasis_message_router_set_formatters_default(stasis_router,
manager_default_msg_cb, NULL); manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI);
res |= stasis_message_router_add(stasis_router, res |= stasis_message_router_add(stasis_router,
ast_manager_get_generic_type(), manager_generic_msg_cb, NULL); ast_manager_get_generic_type(), manager_generic_msg_cb, NULL);

View File

@@ -1255,6 +1255,10 @@ int manager_channels_init(void)
ast_register_cleanup(manager_channels_shutdown); ast_register_cleanup(manager_channels_shutdown);
/* The snapshot type has a special handler as it can result in multiple
* manager events being queued due to aspects of the snapshot itself
* changing.
*/
ret |= stasis_message_router_add(message_router, ret |= stasis_message_router_add(message_router,
ast_channel_snapshot_type(), channel_snapshot_update, NULL); ast_channel_snapshot_type(), channel_snapshot_update, NULL);

View File

@@ -387,19 +387,34 @@ void stasis_message_router_remove_cache_update(
int stasis_message_router_set_default(struct stasis_message_router *router, int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback, stasis_subscription_cb callback,
void *data) void *data)
{
stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE);
/* While this implementation can never fail, it used to be able to */
return 0;
}
void stasis_message_router_set_formatters_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data,
enum stasis_subscription_message_formatters formatters)
{ {
ast_assert(router != NULL); ast_assert(router != NULL);
ast_assert(callback != NULL); ast_assert(callback != NULL);
stasis_subscription_accept_formatters(router->subscription, formatters);
ao2_lock(router); ao2_lock(router);
router->default_route.callback = callback; router->default_route.callback = callback;
router->default_route.data = data; router->default_route.data = data;
ao2_unlock(router); ao2_unlock(router);
stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
/* Formatters govern what messages the default callback get, so it is only if none is
/* While this implementation can never fail, it used to be able to */ * specified that we accept all messages regardless.
return 0; */
stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
}
} }
void stasis_message_router_accept_formatters(struct stasis_message_router *router, void stasis_message_router_accept_formatters(struct stasis_message_router *router,

View File

@@ -317,16 +317,25 @@ static void call_forwarded_handler(struct stasis_app *app, struct stasis_message
ast_channel_unref(chan); ast_channel_unref(chan);
} }
static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(app);
}
}
static void sub_default_handler(void *data, struct stasis_subscription *sub, static void sub_default_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
struct stasis_app *app = data; struct stasis_app *app = data;
struct ast_json *json; struct ast_json *json;
if (stasis_subscription_final_message(sub, message)) { /* The dial type can be converted to JSON so it will always be passed
ao2_cleanup(app); * here.
} */
if (stasis_message_type(message) == ast_channel_dial_type()) { if (stasis_message_type(message) == ast_channel_dial_type()) {
call_forwarded_handler(app, message); call_forwarded_handler(app, message);
} }
@@ -803,7 +812,7 @@ static void bridge_attended_transfer_handler(void *data, struct stasis_subscript
} }
} }
static void bridge_default_handler(void *data, struct stasis_subscription *sub, static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
struct stasis_app *app = data; struct stasis_app *app = data;
@@ -930,8 +939,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
res |= stasis_message_router_add(app->bridge_router, res |= stasis_message_router_add(app->bridge_router,
ast_attended_transfer_type(), bridge_attended_transfer_handler, app); ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
res |= stasis_message_router_set_default(app->bridge_router, res |= stasis_message_router_add(app->bridge_router,
bridge_default_handler, app); stasis_subscription_change_type(), bridge_subscription_change_handler, app);
if (res != 0) { if (res != 0) {
return NULL; return NULL;
@@ -953,8 +962,11 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
res |= stasis_message_router_add_cache_update(app->router, res |= stasis_message_router_add_cache_update(app->router,
ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app); ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
res |= stasis_message_router_set_default(app->router, res |= stasis_message_router_add(app->router,
sub_default_handler, app); stasis_subscription_change_type(), sub_subscription_change_handler, app);
stasis_message_router_set_formatters_default(app->router,
sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
if (res != 0) { if (res != 0) {
return NULL; return NULL;