diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h index 6ebf1ad110..36515f043c 100644 --- a/include/asterisk/channel.h +++ b/include/asterisk/channel.h @@ -2847,6 +2847,16 @@ void ast_channel_internal_swap_endpoint_forward(struct ast_channel *a, struct as */ void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b); +/*! + * \brief Swap endpoints between two channels + * \param a First channel + * \param b Second channel + * + * \note + * This is used in masquerade to exchange endpoints + */ +void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b); + /*! * \brief Set uniqueid and linkedid string value only (not time) * \param chan The channel to set the uniqueid to @@ -4348,6 +4358,8 @@ ast_callid ast_channel_callid(const struct ast_channel *chan); struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan); void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot); struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan); +struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan); +void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint); /*! * \pre chan is locked diff --git a/include/asterisk/endpoints.h b/include/asterisk/endpoints.h index 0c5edcec31..0be9d352ba 100644 --- a/include/asterisk/endpoints.h +++ b/include/asterisk/endpoints.h @@ -210,5 +210,18 @@ void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, int ast_endpoint_add_channel(struct ast_endpoint *endpoint, struct ast_channel *chan); +/*! + * \brief Removes a channel from the given endpoint. + * \since 23.1.0 + * \since 22.7.0 + * \since 20.17.0 + * + * \param endpoint + * \param chan Channel. + * \retval 0 on success. + * \retval Non-zero on error. + */ + int ast_endpoint_remove_channel(struct ast_endpoint *endpoint, + struct ast_channel *chan); #endif /* _ASTERISK_ENDPOINTS_H */ diff --git a/main/channel.c b/main/channel.c index 80a892a6a3..855dadd854 100644 --- a/main/channel.c +++ b/main/channel.c @@ -949,7 +949,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char if (endpoint) { - ast_endpoint_add_channel(endpoint, tmp); + ast_channel_endpoint_set(tmp, endpoint); } /* @@ -2195,6 +2195,8 @@ static void ast_channel_destructor(void *obj) ast_channel_lock(chan); + ast_channel_endpoint_set(chan, NULL); + /* Get rid of each of the data stores on the channel */ while ((datastore = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) /* Free the data store */ @@ -7038,6 +7040,9 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann /* The old snapshots need to follow the channels so the snapshot update is correct */ ast_channel_internal_swap_snapshots(clonechan, original); + /* Now we swap the endpoints if present */ + ast_channel_internal_swap_endpoints(clonechan, original); + /* Swap channel names. This uses ast_channel_name_set directly, so we * don't get any spurious rename events. */ diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index e9ba102a87..e3f2a0b95d 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -1432,6 +1432,15 @@ void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_chann b->snapshot = snapshot; } +void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b) +{ + struct ast_endpoint *endpoint; + + endpoint = a->endpoint; + a->endpoint = b->endpoint; + b->endpoint = endpoint; +} + void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid) { ast_copy_string(chan->uniqueid.unique_id, uniqueid, sizeof(chan->uniqueid.unique_id)); @@ -1630,3 +1639,22 @@ struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan) { return &chan->snapshot_segment_flags; } + +struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan) +{ + return chan->endpoint; +} + +void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint) +{ + if (chan->endpoint) { + ast_endpoint_remove_channel(chan->endpoint, chan); + ao2_ref(chan->endpoint, -1); + } + + chan->endpoint = ao2_bump(endpoint); + + if (chan->endpoint) { + ast_endpoint_add_channel(chan->endpoint, chan); + } +} diff --git a/main/channel_private.h b/main/channel_private.h index 4db189c28a..fe3ff8e3bc 100644 --- a/main/channel_private.h +++ b/main/channel_private.h @@ -200,6 +200,7 @@ struct ast_channel { struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */ struct ast_flags snapshot_segment_flags; /*!< Flags regarding the segments of the snapshot */ int linked_in_container; /*!< Whether this channel is linked in a storage container */ + struct ast_endpoint *endpoint; /*!< The endpoint associated with this channel */ }; #if defined(__cplusplus) || defined(c_plusplus) diff --git a/main/endpoints.c b/main/endpoints.c index c53e31d49b..e27ae69c72 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -34,7 +34,6 @@ #include "asterisk/stasis.h" #include "asterisk/stasis_channels.h" #include "asterisk/stasis_endpoints.h" -#include "asterisk/stasis_message_router.h" #include "asterisk/stringfields.h" #include "asterisk/_private.h" @@ -68,8 +67,6 @@ struct ast_endpoint { int max_channels; /*! Topic for this endpoint's messages */ struct stasis_cp_single *topics; - /*! Router for handling this endpoint's messages */ - struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; /*! Forwarding subscription from an endpoint to its tech endpoint */ @@ -146,11 +143,6 @@ static void endpoint_dtor(void *obj) { struct ast_endpoint *endpoint = obj; - /* The router should be shut down already */ - ast_assert(stasis_message_router_is_done(endpoint->router)); - ao2_cleanup(endpoint->router); - endpoint->router = NULL; - stasis_cp_single_unsubscribe(endpoint->topics); endpoint->topics = NULL; @@ -179,43 +171,26 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint, return 0; } -/*! \brief Handler for channel snapshot update */ -static void endpoint_cache_clear(void *data, - struct stasis_subscription *sub, - struct stasis_message *message) +int ast_endpoint_remove_channel(struct ast_endpoint *endpoint, + struct ast_channel *chan) { - struct ast_endpoint *endpoint = data; - struct ast_channel_snapshot_update *update = stasis_message_data(message); - - /* Only when the channel is dead do we remove it */ - if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { - return; - } - + ast_assert(chan != NULL); ast_assert(endpoint != NULL); + ast_assert(!ast_strlen_zero(endpoint->resource)); ao2_lock(endpoint); - ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid); + ast_str_container_remove(endpoint->channel_ids, ast_channel_uniqueid(chan)); ao2_unlock(endpoint); + endpoint_publish_snapshot(endpoint); -} -static void endpoint_subscription_change(void *data, - struct stasis_subscription *sub, - struct stasis_message *message) -{ - struct stasis_endpoint *endpoint = data; - - if (stasis_subscription_final_message(sub, message)) { - ao2_cleanup(endpoint); - } + return 0; } static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource) { RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup); - int r = 0; /* Get/create the technology endpoint */ if (!ast_strlen_zero(resource)) { @@ -272,20 +247,6 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type()); stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); - endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); - if (!endpoint->router) { - return NULL; - } - r |= stasis_message_router_add(endpoint->router, - ast_channel_snapshot_type(), endpoint_cache_clear, - endpoint); - r |= stasis_message_router_add(endpoint->router, - stasis_subscription_change_type(), endpoint_subscription_change, - endpoint); - if (r) { - return NULL; - } - endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics), stasis_cp_single_topic(tech_endpoint->topics)); @@ -366,10 +327,6 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) stasis_publish(ast_endpoint_topic(endpoint), message); } } - - /* Bump refcount to hold on to the router */ - ao2_ref(endpoint->router, +1); - stasis_message_router_unsubscribe(endpoint->router); } const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint) diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c index 4c70e21fc5..cd45196f41 100644 --- a/tests/test_stasis_endpoints.c +++ b/tests/test_stasis_endpoints.c @@ -256,6 +256,8 @@ AST_TEST_DEFINE(channel_messages) actual_snapshot = stasis_message_data(msg); ast_test_validate(test, 1 == actual_snapshot->num_channels); + ast_endpoint_remove_channel(uut, chan); + ast_hangup(chan); chan = NULL;