mirror of
https://github.com/asterisk/asterisk.git
synced 2025-10-05 20:55:26 +00:00
stasis: Add internal filtering of messages.
This change adds the ability for subscriptions to indicate which message types they are interested in accepting. By doing so the filtering is done before being dispatched to the subscriber, reducing the amount of work that has to be done. This is optional and if a subscriber does not add message types they wish to accept and set the subscription to selective filtering the previous behavior is preserved and they receive all messages. There is also the ability to explicitly force the reception of all messages for cases such as AMI or ARI where a large number of messages are expected that are then generically converted into a different format. ASTERISK-28103 Change-Id: I99bee23895baa0a117985d51683f7963b77aa190
This commit is contained in:
115
main/stasis.c
115
main/stasis.c
@@ -370,6 +370,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
|
||||
return topic->name;
|
||||
}
|
||||
|
||||
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
|
||||
{
|
||||
return AST_VECTOR_SIZE(&topic->subscribers);
|
||||
}
|
||||
|
||||
/*! \internal */
|
||||
struct stasis_subscription {
|
||||
/*! Unique ID for this subscription */
|
||||
@@ -391,6 +396,11 @@ struct stasis_subscription {
|
||||
/*! Flag set when final message for sub has been processed.
|
||||
* Be sure join_lock is held before reading/setting. */
|
||||
int final_message_processed;
|
||||
|
||||
/*! The message types this subscription is accepting */
|
||||
AST_VECTOR(, char) accepted_message_types;
|
||||
/*! The message filter currently in use */
|
||||
enum stasis_subscription_message_filter filter;
|
||||
};
|
||||
|
||||
static void subscription_dtor(void *obj)
|
||||
@@ -409,6 +419,8 @@ static void subscription_dtor(void *obj)
|
||||
ast_taskprocessor_unreference(sub->mailbox);
|
||||
sub->mailbox = NULL;
|
||||
ast_cond_destroy(&sub->join_cond);
|
||||
|
||||
AST_VECTOR_FREE(&sub->accepted_message_types);
|
||||
}
|
||||
|
||||
/*!
|
||||
@@ -420,19 +432,25 @@ static void subscription_dtor(void *obj)
|
||||
static void subscription_invoke(struct stasis_subscription *sub,
|
||||
struct stasis_message *message)
|
||||
{
|
||||
unsigned int final = stasis_subscription_final_message(sub, message);
|
||||
int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
|
||||
|
||||
/* Notify that the final message has been received */
|
||||
if (stasis_subscription_final_message(sub, message)) {
|
||||
if (final) {
|
||||
ao2_lock(sub);
|
||||
sub->final_message_rxed = 1;
|
||||
ast_cond_signal(&sub->join_cond);
|
||||
ao2_unlock(sub);
|
||||
}
|
||||
|
||||
/* Since sub is mostly immutable, no need to lock sub */
|
||||
sub->callback(sub->data, sub, message);
|
||||
if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
|
||||
(message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
|
||||
/* Since sub is mostly immutable, no need to lock sub */
|
||||
sub->callback(sub->data, sub, message);
|
||||
}
|
||||
|
||||
/* Notify that the final message has been processed */
|
||||
if (stasis_subscription_final_message(sub, message)) {
|
||||
if (final) {
|
||||
ao2_lock(sub);
|
||||
sub->final_message_processed = 1;
|
||||
ast_cond_signal(&sub->join_cond);
|
||||
@@ -500,6 +518,8 @@ struct stasis_subscription *internal_stasis_subscribe(
|
||||
sub->callback = callback;
|
||||
sub->data = data;
|
||||
ast_cond_init(&sub->join_cond, NULL);
|
||||
sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
|
||||
AST_VECTOR_INIT(&sub->accepted_message_types, 0);
|
||||
|
||||
if (topic_add_subscription(topic, sub) != 0) {
|
||||
ao2_ref(sub, -1);
|
||||
@@ -586,6 +606,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
|
||||
return res;
|
||||
}
|
||||
|
||||
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
|
||||
const struct stasis_message_type *type)
|
||||
{
|
||||
if (!subscription) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ast_assert(type != NULL);
|
||||
ast_assert(stasis_message_type_name(type) != NULL);
|
||||
|
||||
if (!type || !stasis_message_type_name(type)) {
|
||||
/* Filtering is unreliable as this message type is not yet initialized
|
||||
* so force all messages through.
|
||||
*/
|
||||
subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
ao2_lock(subscription->topic);
|
||||
if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
|
||||
/* We do this for the same reason as above. The subscription can still operate, so allow
|
||||
* it to do so by forcing all messages through.
|
||||
*/
|
||||
subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
|
||||
}
|
||||
ao2_unlock(subscription->topic);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
|
||||
const struct stasis_message_type *type)
|
||||
{
|
||||
if (!subscription) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ast_assert(type != NULL);
|
||||
ast_assert(stasis_message_type_name(type) != NULL);
|
||||
|
||||
if (!type || !stasis_message_type_name(type)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ao2_lock(subscription->topic);
|
||||
if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
|
||||
/* The memory is already allocated so this can't fail */
|
||||
AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
|
||||
}
|
||||
ao2_unlock(subscription->topic);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stasis_subscription_set_filter(struct stasis_subscription *subscription,
|
||||
enum stasis_subscription_message_filter filter)
|
||||
{
|
||||
if (!subscription) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ao2_lock(subscription->topic);
|
||||
if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
|
||||
subscription->filter = filter;
|
||||
}
|
||||
ao2_unlock(subscription->topic);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void stasis_subscription_join(struct stasis_subscription *subscription)
|
||||
{
|
||||
if (subscription) {
|
||||
@@ -781,6 +871,18 @@ static void dispatch_message(struct stasis_subscription *sub,
|
||||
struct stasis_message *message,
|
||||
int synchronous)
|
||||
{
|
||||
/* Determine if this subscription is interested in this message. Note that final
|
||||
* messages are special and are always invoked on the subscription.
|
||||
*/
|
||||
if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
|
||||
int message_type_id = stasis_message_type_id(stasis_message_type(message));
|
||||
if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
|
||||
!AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
|
||||
!stasis_subscription_final_message(sub, message)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (!sub->mailbox) {
|
||||
/* Dispatch directly */
|
||||
subscription_invoke(sub, message);
|
||||
@@ -840,6 +942,11 @@ static void publish_msg(struct stasis_topic *topic,
|
||||
ast_assert(topic != NULL);
|
||||
ast_assert(message != NULL);
|
||||
|
||||
/* If there are no subscribers don't bother */
|
||||
if (!stasis_topic_subscribers(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* The topic may be unref'ed by the subscription invocation.
|
||||
* Make sure we hold onto a reference while dispatching.
|
||||
|
Reference in New Issue
Block a user