mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-26 14:27:14 +00:00 
			
		
		
		
	endpoints: Remove need for stasis subscription.
When an endpoint is created in the core of Asterisk a subscription was previously created alongside it to monitor any channels being destroyed that were related to it. This was done by receiving all channel snapshot updates for every channel and only reacting when it was indicated that the channel was dead. This change removes this logic and instead provides an API call for directly removing a channel from an endpoint. This is called when channels are destroyed. This operation is fast, so blocking the calling thread for a short period of time doesn't have any noticeable impact.
This commit is contained in:
		| @@ -949,7 +949,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char | ||||
|  | ||||
|  | ||||
| 	if (endpoint) { | ||||
| 		ast_endpoint_add_channel(endpoint, tmp); | ||||
| 		ast_channel_endpoint_set(tmp, endpoint); | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| @@ -2195,6 +2195,8 @@ static void ast_channel_destructor(void *obj) | ||||
|  | ||||
| 	ast_channel_lock(chan); | ||||
|  | ||||
| 	ast_channel_endpoint_set(chan, NULL); | ||||
|  | ||||
| 	/* Get rid of each of the data stores on the channel */ | ||||
| 	while ((datastore = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) | ||||
| 		/* Free the data store */ | ||||
| @@ -7038,6 +7040,9 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann | ||||
| 	/* The old snapshots need to follow the channels so the snapshot update is correct */ | ||||
| 	ast_channel_internal_swap_snapshots(clonechan, original); | ||||
|  | ||||
| 	/* Now we swap the endpoints if present */ | ||||
| 	ast_channel_internal_swap_endpoints(clonechan, original); | ||||
|  | ||||
| 	/* Swap channel names. This uses ast_channel_name_set directly, so we | ||||
| 	 * don't get any spurious rename events. | ||||
| 	 */ | ||||
|   | ||||
| @@ -1432,6 +1432,15 @@ void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_chann | ||||
| 	b->snapshot = snapshot; | ||||
| } | ||||
|  | ||||
| void ast_channel_internal_swap_endpoints(struct ast_channel *a, struct ast_channel *b) | ||||
| { | ||||
| 	struct ast_endpoint *endpoint; | ||||
|  | ||||
| 	endpoint = a->endpoint; | ||||
| 	a->endpoint = b->endpoint; | ||||
| 	b->endpoint = endpoint; | ||||
| } | ||||
|  | ||||
| void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid) | ||||
| { | ||||
| 	ast_copy_string(chan->uniqueid.unique_id, uniqueid, sizeof(chan->uniqueid.unique_id)); | ||||
| @@ -1630,3 +1639,22 @@ struct ast_flags *ast_channel_snapshot_segment_flags(struct ast_channel *chan) | ||||
| { | ||||
| 	return &chan->snapshot_segment_flags; | ||||
| } | ||||
|  | ||||
| struct ast_endpoint *ast_channel_endpoint(const struct ast_channel *chan) | ||||
| { | ||||
| 	return chan->endpoint; | ||||
| } | ||||
|  | ||||
| void ast_channel_endpoint_set(struct ast_channel *chan, struct ast_endpoint *endpoint) | ||||
| { | ||||
| 	if (chan->endpoint) { | ||||
| 		ast_endpoint_remove_channel(chan->endpoint, chan); | ||||
| 		ao2_ref(chan->endpoint, -1); | ||||
| 	} | ||||
|  | ||||
| 	chan->endpoint = ao2_bump(endpoint); | ||||
|  | ||||
| 	if (chan->endpoint) { | ||||
| 		ast_endpoint_add_channel(chan->endpoint, chan); | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -200,6 +200,7 @@ struct ast_channel { | ||||
| 	struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */ | ||||
| 	struct ast_flags snapshot_segment_flags; /*!< Flags regarding the segments of the snapshot */ | ||||
| 	int linked_in_container; /*!< Whether this channel is linked in a storage container */ | ||||
| 	struct ast_endpoint *endpoint; /*!< The endpoint associated with this channel */ | ||||
| }; | ||||
|  | ||||
| #if defined(__cplusplus) || defined(c_plusplus) | ||||
|   | ||||
| @@ -34,7 +34,6 @@ | ||||
| #include "asterisk/stasis.h" | ||||
| #include "asterisk/stasis_channels.h" | ||||
| #include "asterisk/stasis_endpoints.h" | ||||
| #include "asterisk/stasis_message_router.h" | ||||
| #include "asterisk/stringfields.h" | ||||
| #include "asterisk/_private.h" | ||||
|  | ||||
| @@ -68,8 +67,6 @@ struct ast_endpoint { | ||||
| 	int max_channels; | ||||
| 	/*! Topic for this endpoint's messages */ | ||||
| 	struct stasis_cp_single *topics; | ||||
| 	/*! Router for handling this endpoint's messages */ | ||||
| 	struct stasis_message_router *router; | ||||
| 	/*! ast_str_container of channels associated with this endpoint */ | ||||
| 	struct ao2_container *channel_ids; | ||||
| 	/*! Forwarding subscription from an endpoint to its tech endpoint */ | ||||
| @@ -146,11 +143,6 @@ static void endpoint_dtor(void *obj) | ||||
| { | ||||
| 	struct ast_endpoint *endpoint = obj; | ||||
|  | ||||
| 	/* The router should be shut down already */ | ||||
| 	ast_assert(stasis_message_router_is_done(endpoint->router)); | ||||
| 	ao2_cleanup(endpoint->router); | ||||
| 	endpoint->router = NULL; | ||||
|  | ||||
| 	stasis_cp_single_unsubscribe(endpoint->topics); | ||||
| 	endpoint->topics = NULL; | ||||
|  | ||||
| @@ -179,43 +171,26 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint, | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| /*! \brief Handler for channel snapshot update */ | ||||
| static void endpoint_cache_clear(void *data, | ||||
| 	struct stasis_subscription *sub, | ||||
| 	struct stasis_message *message) | ||||
| int ast_endpoint_remove_channel(struct ast_endpoint *endpoint, | ||||
| 	struct ast_channel *chan) | ||||
| { | ||||
| 	struct ast_endpoint *endpoint = data; | ||||
| 	struct ast_channel_snapshot_update *update = stasis_message_data(message); | ||||
|  | ||||
| 	/* Only when the channel is dead do we remove it */ | ||||
| 	if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	ast_assert(chan != NULL); | ||||
| 	ast_assert(endpoint != NULL); | ||||
| 	ast_assert(!ast_strlen_zero(endpoint->resource)); | ||||
|  | ||||
| 	ao2_lock(endpoint); | ||||
| 	ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid); | ||||
| 	ast_str_container_remove(endpoint->channel_ids, ast_channel_uniqueid(chan)); | ||||
| 	ao2_unlock(endpoint); | ||||
|  | ||||
| 	endpoint_publish_snapshot(endpoint); | ||||
| } | ||||
|  | ||||
| static void endpoint_subscription_change(void *data, | ||||
| 	struct stasis_subscription *sub, | ||||
| 	struct stasis_message *message) | ||||
| { | ||||
| 	struct stasis_endpoint *endpoint = data; | ||||
|  | ||||
| 	if (stasis_subscription_final_message(sub, message)) { | ||||
| 		ao2_cleanup(endpoint); | ||||
| 	} | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource) | ||||
| { | ||||
| 	RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); | ||||
| 	RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup); | ||||
| 	int r = 0; | ||||
|  | ||||
| 	/* Get/create the technology endpoint */ | ||||
| 	if (!ast_strlen_zero(resource)) { | ||||
| @@ -272,20 +247,6 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha | ||||
| 		stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type()); | ||||
| 		stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); | ||||
|  | ||||
| 		endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); | ||||
| 		if (!endpoint->router) { | ||||
| 			return NULL; | ||||
| 		} | ||||
| 		r |= stasis_message_router_add(endpoint->router, | ||||
| 			ast_channel_snapshot_type(), endpoint_cache_clear, | ||||
| 			endpoint); | ||||
| 		r |= stasis_message_router_add(endpoint->router, | ||||
| 			stasis_subscription_change_type(), endpoint_subscription_change, | ||||
| 			endpoint); | ||||
| 		if (r) { | ||||
| 			return NULL; | ||||
| 		} | ||||
|  | ||||
| 		endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics), | ||||
| 			stasis_cp_single_topic(tech_endpoint->topics)); | ||||
|  | ||||
| @@ -366,10 +327,6 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) | ||||
| 			stasis_publish(ast_endpoint_topic(endpoint), message); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	/* Bump refcount to hold on to the router */ | ||||
| 	ao2_ref(endpoint->router, +1); | ||||
| 	stasis_message_router_unsubscribe(endpoint->router); | ||||
| } | ||||
|  | ||||
| const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user