Stasis/messaging: tech subscriptions conflict with endpoint subscriptions.

When both a tech subscription and an endpoint subscription exist for a given
endpoint, TextMessageReceived events are dispatched to the tech subscription
only.

ASTERISK-29229

Change-Id: I9eac4cba5f9e27285a282509395347abc58fc2b8
This commit is contained in:
Jean Aunis
2020-12-30 14:56:47 +01:00
committed by George Joseph
parent 1c05667cfc
commit c559667868

View File

@@ -289,18 +289,42 @@ static struct ast_json *msg_to_json(struct ast_msg *msg)
return json_obj; return json_obj;
} }
static void dispatch_message(struct message_subscription *sub, const char *endpoint_name, struct ast_json *json_msg)
{
int i;
ast_debug(3, "Dispatching message to subscription %s for endpoint %s\n",
sub->token,
endpoint_name);
for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
tuple->callback(endpoint_name, json_msg, tuple->pvt);
}
}
static int handle_msg_cb(struct ast_msg *msg) static int handle_msg_cb(struct ast_msg *msg)
{ {
/* We have at most 3 subscriptions: TECH_WILDCARD, tech itself, and endpoint. */
struct message_subscription *matching_subscriptions[3];
struct message_subscription *sub; struct message_subscription *sub;
int i; int i, j;
int result;
char buf[256]; char buf[256];
const char *endpoint_name; const char *endpoint_name;
struct ast_json *json_msg; struct ast_json *json_msg;
msg_to_endpoint(msg, buf, sizeof(buf)); msg_to_endpoint(msg, buf, sizeof(buf));
endpoint_name = buf;
json_msg = msg_to_json(msg);
if (!json_msg) {
return -1;
}
result = -1;
/* Find subscriptions to TECH_WILDCARD and to the endpoint's technology. */
ast_rwlock_rdlock(&tech_subscriptions_lock); ast_rwlock_rdlock(&tech_subscriptions_lock);
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { for (i = 0, j = 0; i < AST_VECTOR_SIZE(&tech_subscriptions) && j < 2; i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i); sub = AST_VECTOR_GET(&tech_subscriptions, i);
if (!sub) { if (!sub) {
@@ -309,40 +333,30 @@ static int handle_msg_cb(struct ast_msg *msg)
if (!strcmp(sub->token, TECH_WILDCARD) if (!strcmp(sub->token, TECH_WILDCARD)
|| !strncasecmp(sub->token, buf, strlen(sub->token))) { || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock); ao2_ref(sub, +1);
ao2_bump(sub); matching_subscriptions[j++] = sub;
endpoint_name = buf;
goto match;
} }
} }
ast_rwlock_unlock(&tech_subscriptions_lock); ast_rwlock_unlock(&tech_subscriptions_lock);
/* Find the subscription to this particular endpoint. */
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) { if (sub) {
endpoint_name = buf; matching_subscriptions[j++] = sub;
goto match;
} }
return -1; /* Dispatch the message to all matching subscriptions. */
for (i = 0; i < j; i++) {
sub = matching_subscriptions[i];
match: dispatch_message(sub, endpoint_name, json_msg);
ast_debug(3, "Dispatching message for %s\n", endpoint_name);
json_msg = msg_to_json(msg);
if (!json_msg) {
ao2_ref(sub, -1); ao2_ref(sub, -1);
return -1; result = 0;
}
for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
tuple->callback(endpoint_name, json_msg, tuple->pvt);
} }
ast_json_unref(json_msg); ast_json_unref(json_msg);
ao2_ref(sub, -1); return result;
return 0;
} }
struct ast_msg_handler ari_msg_handler = { struct ast_msg_handler ari_msg_handler = {