mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-26 22:30:28 +00:00 
			
		
		
		
	GCC 12 caught an issue in state_id_by_topic where we were checking a pointer for NULL instead of the contents of the pointer for '\0'. ASTERISK-30044 Change-Id: Ia0b04d4fff45c92acb7f07132a33622fa341148e
		
			
				
	
	
		
			772 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			772 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  * Asterisk -- An open source telephony toolkit.
 | |
|  *
 | |
|  * Copyright (C) 2019, Sangoma Technologies Corporation
 | |
|  *
 | |
|  * Kevin Harwell <kharwell@digium.com>
 | |
|  *
 | |
|  * See http://www.asterisk.org for more information about
 | |
|  * the Asterisk project. Please do not directly contact
 | |
|  * any of the maintainers of this project for assistance;
 | |
|  * the project provides a web site, mailing lists and IRC
 | |
|  * channels for your use.
 | |
|  *
 | |
|  * This program is free software, distributed under the terms of
 | |
|  * the GNU General Public License Version 2. See the LICENSE file
 | |
|  * at the top of the source tree.
 | |
|  */
 | |
| 
 | |
| /*** MODULEINFO
 | |
| 	<support_level>core</support_level>
 | |
|  ***/
 | |
| 
 | |
| #include "asterisk.h"
 | |
| 
 | |
| #include "asterisk/stasis_state.h"
 | |
| 
 | |
| /*!
 | |
|  * \internal
 | |
|  * \brief Used to link a stasis_state to it's manager
 | |
|  */
 | |
| struct stasis_state_proxy {
 | |
| 	AO2_WEAKPROXY();
 | |
| 	/*! The manager that owns and handles this state */
 | |
| 	struct stasis_state_manager *manager;
 | |
| 	/*! A unique id for this state object. */
 | |
| 	char id[0];
 | |
| };
 | |
| 
 | |
| /*!
 | |
|  * \internal
 | |
|  * \brief Associates a stasis topic to its last known published message
 | |
|  *
 | |
|  * This object's lifetime is tracked by the number of publishers and subscribers to it.
 | |
|  * Once all publishers and subscribers have been removed this object is removed from the
 | |
|  * manager's collection and destroyed. While a single object type (namely this one) could
 | |
|  * be utilized for both publishers, and subscribers this implementation purposely keeps
 | |
|  * them separated. This was done to maintain readability, make debugging easier, and allow
 | |
|  * for better logging and future enhancements.
 | |
|  */
 | |
| struct stasis_state {
 | |
| 	/*! The number of state subscribers */
 | |
| 	unsigned int num_subscribers;
 | |
| 	/*!
 | |
| 	 * \brief The manager that owns and handles this state
 | |
| 	 * \note This reference is owned by stasis_state_proxy
 | |
| 	 */
 | |
| 	struct stasis_state_manager *manager;
 | |
| 	/*! Forwarding information, i.e. this topic to manager's topic */
 | |
| 	struct stasis_forward *forward;
 | |
| 	/*! The managed topic */
 | |
| 	struct stasis_topic *topic;
 | |
| 	/*! The actual state data */
 | |
| 	struct stasis_message *msg;
 | |
| 	/*!
 | |
| 	 * A container of eids. It's assumed that there is only a single publisher per
 | |
| 	 * eid per topic. Thus the publisher is tracked by the system's eid.
 | |
| 	 */
 | |
| 	AST_VECTOR(, struct ast_eid) eids;
 | |
| 	/*! A unique id for this state object. */
 | |
| 	char *id;
 | |
| };
 | |
| 
 | |
| AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id);
 | |
| AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id);
 | |
| 
 | |
| /*! The number of buckets to use for managed states */
 | |
| #define STATE_BUCKETS 57
 | |
| 
 | |
| struct stasis_state_manager {
 | |
| 	/*! Holds all state objects handled by this manager */
 | |
| 	struct ao2_container *states;
 | |
| 	/*! The manager's topic. All state topics are forward to this one */
 | |
| 	struct stasis_topic *all_topic;
 | |
| 	/*! A collection of manager event handlers */
 | |
| 	AST_VECTOR_RW(, struct stasis_state_observer *) observers;
 | |
| };
 | |
| 
 | |
| /*!
 | |
|  * \internal
 | |
|  * \brief Retrieve a state's topic name without the manager topic.
 | |
|  *
 | |
|  * State topics have names that consist of the manager's topic name
 | |
|  * combined with a unique id separated by a slash. For instance:
 | |
|  *
 | |
|  *    manager topic's name/unique id
 | |
|  *
 | |
|  * This method retrieves the unique id part from the state's topic name.
 | |
|  *
 | |
|  * \param manager_topic The manager's topic
 | |
|  * \param state_topic A state topic
 | |
|  *
 | |
|  * \return The state's topic unique id
 | |
|  */
 | |
| static const char *state_id_by_topic(struct stasis_topic *manager_topic,
 | |
| 	const struct stasis_topic *state_topic)
 | |
| {
 | |
| 	const char *id;
 | |
| 
 | |
| 	/* This topic should always belong to the manager */
 | |
| 	ast_assert(ast_begins_with(stasis_topic_name(manager_topic),
 | |
| 		stasis_topic_name(state_topic)));
 | |
| 
 | |
| 	id = strchr(stasis_topic_name(state_topic), '/');
 | |
| 
 | |
| 	/* The state's unique id should always exist */
 | |
| 	ast_assert(id != NULL && *(id + 1) != '\0');
 | |
| 
 | |
| 	return (id + 1);
 | |
| }
 | |
| 
 | |
| static void state_dtor(void *obj)
 | |
| {
 | |
| 	struct stasis_state *state = obj;
 | |
| 
 | |
| 	state->forward = stasis_forward_cancel(state->forward);
 | |
| 	ao2_cleanup(state->topic);
 | |
| 	state->topic = NULL;
 | |
| 	ao2_cleanup(state->msg);
 | |
| 	state->msg = NULL;
 | |
| 
 | |
| 	/* All eids should have been removed */
 | |
| 	ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
 | |
| 	AST_VECTOR_FREE(&state->eids);
 | |
| }
 | |
| 
 | |
| static void state_proxy_dtor(void *obj) {
 | |
| 	struct stasis_state_proxy *proxy = obj;
 | |
| 
 | |
| 	ao2_cleanup(proxy->manager);
 | |
| }
 | |
| 
 | |
| static void state_proxy_sub_cb(void *obj, void *data)
 | |
| {
 | |
| 	struct stasis_state_proxy *proxy = obj;
 | |
| 
 | |
| 	ao2_unlink(proxy->manager->states, proxy);
 | |
| }
 | |
| 
 | |
| /*!
 | |
|  * \internal
 | |
|  * \brief Allocate a stasis state object and add it to the manager.
 | |
|  *
 | |
|  * Create and initialize a state structure. It's required that either a state
 | |
|  * topic, or an id is specified. If a state topic is not given then one will be
 | |
|  * created using the given id.
 | |
|  *
 | |
|  * \param manager The owning manager
 | |
|  * \param state_topic A state topic to be managed
 | |
|  * \param id The unique id for the state
 | |
|  * \param file, line, func
 | |
|  *
 | |
|  * \return A stasis_state object or NULL
 | |
|  * \retval NULL on error
 | |
|  *
 | |
|  * \pre manager->states must be locked.
 | |
|  * \pre manager->states does not contain an object matching key \a id.
 | |
|  */
 | |
| static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
 | |
| 	struct stasis_topic *state_topic, const char *id,
 | |
| 	const char *file, int line, const char *func)
 | |
| {
 | |
| 	struct stasis_state_proxy *proxy = NULL;
 | |
| 	struct stasis_state *state = NULL;
 | |
| 
 | |
| 	if (!id) {
 | |
| 		/* If not given an id, then a state topic is required */
 | |
| 		ast_assert(state_topic != NULL);
 | |
| 
 | |
| 		/* Get the id we'll key off of from the state topic */
 | |
| 		id = state_id_by_topic(manager->all_topic, state_topic);
 | |
| 	}
 | |
| 
 | |
| 	state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
 | |
| 	if (!state) {
 | |
| 		goto error_return;
 | |
| 	}
 | |
| 
 | |
| 	if (!state_topic) {
 | |
| 		char *name;
 | |
| 
 | |
| 		/*
 | |
| 		 * To provide further detail and to ensure that the topic is unique within the
 | |
| 		 * scope of the system we prefix it with the manager's topic name, which should
 | |
| 		 * itself already be unique.
 | |
| 		 */
 | |
| 		if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
 | |
| 			goto error_return;
 | |
| 		}
 | |
| 
 | |
| 		state->topic = stasis_topic_create(name);
 | |
| 
 | |
| 		ast_free(name);
 | |
| 		if (!state->topic) {
 | |
| 			goto error_return;
 | |
| 		}
 | |
| 	} else {
 | |
| 		/*
 | |
| 		 * Since the state topic was passed in, go ahead and bump its reference.
 | |
| 		 * By doing this here first, it allows us to consistently decrease the reference on
 | |
| 		 * state allocation error.
 | |
| 		 */
 | |
| 		ao2_ref(state_topic, +1);
 | |
| 		state->topic = state_topic;
 | |
| 	}
 | |
| 
 | |
| 	proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
 | |
| 	if (!proxy) {
 | |
| 		goto error_return;
 | |
| 	}
 | |
| 
 | |
| 	strcpy(proxy->id, id); /* Safe */
 | |
| 
 | |
| 	state->id = proxy->id;
 | |
| 	proxy->manager = ao2_bump(manager);
 | |
| 	state->manager = proxy->manager; /* state->manager is owned by the proxy */
 | |
| 
 | |
| 	state->forward = stasis_forward_all(state->topic, manager->all_topic);
 | |
| 	if (!state->forward) {
 | |
| 		goto error_return;
 | |
| 	}
 | |
| 
 | |
| 	if (AST_VECTOR_INIT(&state->eids, 2)) {
 | |
| 		goto error_return;
 | |
| 	}
 | |
| 
 | |
| 	if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
 | |
| 		goto error_return;
 | |
| 	}
 | |
| 
 | |
| 	if (ao2_weakproxy_subscribe(proxy, state_proxy_sub_cb, NULL, OBJ_NOLOCK)) {
 | |
| 		goto error_return;
 | |
| 	}
 | |
| 
 | |
| 	if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
 | |
| 		goto error_return;
 | |
| 	}
 | |
| 
 | |
| 	ao2_ref(proxy, -1);
 | |
| 
 | |
| 	return state;
 | |
| 
 | |
| error_return:
 | |
| 	ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
 | |
| 			id, stasis_topic_name(manager->all_topic));
 | |
| 	ao2_cleanup(state);
 | |
| 	ao2_cleanup(proxy);
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| /*!
 | |
|  * \internal
 | |
|  * \brief Find a state by id, or create one if not found and add it to the manager.
 | |
|  *
 | |
|  * \param manager The manager to be added to
 | |
|  * \param state_topic A state topic to be managed (if NULL id is required)
 | |
|  * \param id The unique id for the state (if NULL state_topic is required)
 | |
|  *
 | |
|  * \return The added state object
 | |
|  * \retval NULL on error
 | |
|  */
 | |
| #define state_find_or_add(manager, state_topic, id) __state_find_or_add(manager, state_topic, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 | |
| static struct stasis_state *__state_find_or_add(struct stasis_state_manager *manager,
 | |
| 	struct stasis_topic *state_topic, const char *id,
 | |
| 	const char *file, int line, const char *func)
 | |
| {
 | |
| 	struct stasis_state *state;
 | |
| 
 | |
| 	ao2_lock(manager->states);
 | |
| 	if (ast_strlen_zero(id)) {
 | |
| 		id = state_id_by_topic(manager->all_topic, state_topic);
 | |
| 	}
 | |
| 
 | |
| 	state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
 | |
| 	if (!state) {
 | |
| 		state = state_alloc(manager, state_topic, id, file, line, func);
 | |
| 	}
 | |
| 
 | |
| 	ao2_unlock(manager->states);
 | |
| 
 | |
| 	return state;
 | |
| }
 | |
| 
 | |
| static void state_manager_dtor(void *obj)
 | |
| {
 | |
| 	struct stasis_state_manager *manager = obj;
 | |
| 
 | |
| #ifdef AO2_DEBUG
 | |
| 	{
 | |
| 		char *container_name =
 | |
| 			ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
 | |
| 		sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
 | |
| 		ao2_container_unregister(container_name);
 | |
| 	}
 | |
| #endif
 | |
| 
 | |
| 	ao2_cleanup(manager->states);
 | |
| 	manager->states = NULL;
 | |
| 	ao2_cleanup(manager->all_topic);
 | |
| 	manager->all_topic = NULL;
 | |
| 	AST_VECTOR_RW_FREE(&manager->observers);
 | |
| }
 | |
| 
 | |
| #ifdef AO2_DEBUG
 | |
| static void state_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
 | |
| {
 | |
| 	struct stasis_state *state = v_obj;
 | |
| 
 | |
| 	if (!state) {
 | |
| 		return;
 | |
| 	}
 | |
| 	prnt(where, "%s", stasis_topic_name(state->topic));
 | |
| }
 | |
| #endif
 | |
| 
 | |
| struct stasis_state_manager *stasis_state_manager_create(const char *topic_name)
 | |
| {
 | |
| 	struct stasis_state_manager *manager;
 | |
| 
 | |
| 	manager = ao2_alloc_options(sizeof(*manager), state_manager_dtor,
 | |
| 		AO2_ALLOC_OPT_LOCK_NOLOCK);
 | |
| 	if (!manager) {
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	manager->states = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
 | |
| 		STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
 | |
| 	if (!manager->states) {
 | |
| 		ao2_ref(manager, -1);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	manager->all_topic = stasis_topic_create(topic_name);
 | |
| 	if (!manager->all_topic) {
 | |
| 		ao2_ref(manager, -1);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	if (AST_VECTOR_RW_INIT(&manager->observers, 2) != 0) {
 | |
| 		ao2_ref(manager, -1);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| #ifdef AO2_DEBUG
 | |
| 	{
 | |
| 		char *container_name =
 | |
| 			ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
 | |
| 		sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
 | |
| 		ao2_container_register(container_name, manager->states, state_prnt_obj);
 | |
| 	}
 | |
| #endif
 | |
| 
 | |
| 	return manager;
 | |
| }
 | |
| 
 | |
| struct stasis_topic *stasis_state_all_topic(struct stasis_state_manager *manager)
 | |
| {
 | |
| 	return manager->all_topic;
 | |
| }
 | |
| 
 | |
| struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, const char *id)
 | |
| {
 | |
| 	struct stasis_topic *topic;
 | |
| 	struct stasis_state *state;
 | |
| 
 | |
| 	state = state_find_or_add(manager, NULL, id);
 | |
| 	if (!state) {
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	topic = state->topic;
 | |
| 	ao2_ref(state, -1);
 | |
| 	return topic;
 | |
| }
 | |
| 
 | |
| struct stasis_state_subscriber {
 | |
| 	/*! The stasis state subscribed to */
 | |
| 	struct stasis_state *state;
 | |
| 	/*! The stasis subscription. */
 | |
| 	struct stasis_subscription *stasis_sub;
 | |
| };
 | |
| 
 | |
| static void subscriber_dtor(void *obj)
 | |
| {
 | |
| 	size_t i;
 | |
| 	struct stasis_state_subscriber *sub = obj;
 | |
| 	struct stasis_state_manager *manager = sub->state->manager;
 | |
| 
 | |
| 	AST_VECTOR_RW_RDLOCK(&manager->observers);
 | |
| 	for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
 | |
| 		if (AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe) {
 | |
| 			AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe(sub->state->id, sub);
 | |
| 		}
 | |
| 	}
 | |
| 	AST_VECTOR_RW_UNLOCK(&manager->observers);
 | |
| 
 | |
| 	ao2_lock(sub->state);
 | |
| 	--sub->state->num_subscribers;
 | |
| 	ao2_unlock(sub->state);
 | |
| 
 | |
| 	ao2_ref(sub->state, -1);
 | |
| }
 | |
| 
 | |
| struct stasis_state_subscriber *stasis_state_add_subscriber(
 | |
| 	struct stasis_state_manager *manager, const char *id)
 | |
| {
 | |
| 	size_t i;
 | |
| 	struct stasis_state_subscriber *sub = ao2_alloc_options(
 | |
| 		sizeof(*sub), subscriber_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
 | |
| 
 | |
| 	if (!sub) {
 | |
| 		ast_log(LOG_ERROR, "Unable to create subscriber to %s/%s\n",
 | |
| 			stasis_topic_name(manager->all_topic), id);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	sub->state = state_find_or_add(manager, NULL, id);
 | |
| 	if (!sub->state) {
 | |
| 		ao2_ref(sub, -1);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	ao2_lock(sub->state);
 | |
| 	++sub->state->num_subscribers;
 | |
| 	ao2_unlock(sub->state);
 | |
| 
 | |
| 	AST_VECTOR_RW_RDLOCK(&manager->observers);
 | |
| 	for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
 | |
| 		if (AST_VECTOR_GET(&manager->observers, i)->on_subscribe) {
 | |
| 			AST_VECTOR_GET(&manager->observers, i)->on_subscribe(id, sub);
 | |
| 		}
 | |
| 	}
 | |
| 	AST_VECTOR_RW_UNLOCK(&manager->observers);
 | |
| 
 | |
| 	return sub;
 | |
| }
 | |
| 
 | |
| struct stasis_state_subscriber *stasis_state_subscribe_pool(struct stasis_state_manager *manager,
 | |
| 	const char *id, stasis_subscription_cb callback, void *data)
 | |
| {
 | |
| 	struct stasis_topic *topic;
 | |
| 	struct stasis_state_subscriber *sub = stasis_state_add_subscriber(manager, id);
 | |
| 
 | |
| 	if (!sub) {
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	topic = sub->state->topic;
 | |
| 	ast_debug(3, "Creating stasis state subscription to id '%s'. Topic: '%s':%p %d\n",
 | |
| 		id, stasis_topic_name(topic), topic, (int)ao2_ref(topic, 0));
 | |
| 
 | |
| 	sub->stasis_sub = stasis_subscribe_pool(topic, callback, data);
 | |
| 
 | |
| 	if (!sub->stasis_sub) {
 | |
| 		ao2_ref(sub, -1);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	return sub;
 | |
| }
 | |
| 
 | |
| void *stasis_state_unsubscribe(struct stasis_state_subscriber *sub)
 | |
| {
 | |
| 	sub->stasis_sub = stasis_unsubscribe(sub->stasis_sub);
 | |
| 	ao2_ref(sub, -1);
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| void *stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
 | |
| {
 | |
| 	if (sub) {
 | |
| 		sub->stasis_sub = stasis_unsubscribe_and_join(sub->stasis_sub);
 | |
| 		ao2_ref(sub, -1);
 | |
| 	}
 | |
| 
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| const char *stasis_state_subscriber_id(const struct stasis_state_subscriber *sub)
 | |
| {
 | |
| 	return sub->state->id;
 | |
| }
 | |
| 
 | |
| struct stasis_topic *stasis_state_subscriber_topic(struct stasis_state_subscriber *sub)
 | |
| {
 | |
| 	return sub->state->topic;
 | |
| }
 | |
| 
 | |
| void *stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
 | |
| {
 | |
| 	void *res;
 | |
| 
 | |
| 	/*
 | |
| 	 * The data's reference needs to be bumped before returning so it doesn't disappear
 | |
| 	 * for the caller. Lock state, so the underlying message data is not replaced while
 | |
| 	 * retrieving.
 | |
| 	 */
 | |
| 	ao2_lock(sub->state);
 | |
| 	res = ao2_bump(stasis_message_data(sub->state->msg));
 | |
| 	ao2_unlock(sub->state);
 | |
| 
 | |
| 	return res;
 | |
| }
 | |
| 
 | |
| struct stasis_subscription *stasis_state_subscriber_subscription(
 | |
| 	struct stasis_state_subscriber *sub)
 | |
| {
 | |
| 	return sub->stasis_sub;
 | |
| }
 | |
| 
 | |
| struct stasis_state_publisher {
 | |
| 	/*! The stasis state to publish to */
 | |
| 	struct stasis_state *state;
 | |
| };
 | |
| 
 | |
| static void publisher_dtor(void *obj)
 | |
| {
 | |
| 	struct stasis_state_publisher *pub = obj;
 | |
| 
 | |
| 	ao2_ref(pub->state, -1);
 | |
| }
 | |
| 
 | |
| struct stasis_state_publisher *stasis_state_add_publisher(
 | |
| 	struct stasis_state_manager *manager, const char *id)
 | |
| {
 | |
| 	struct stasis_state_publisher *pub = ao2_alloc_options(
 | |
| 		sizeof(*pub), publisher_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
 | |
| 
 | |
| 	if (!pub) {
 | |
| 		ast_log(LOG_ERROR, "Unable to create publisher to %s/%s\n",
 | |
| 			stasis_topic_name(manager->all_topic), id);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	pub->state = state_find_or_add(manager, NULL, id);
 | |
| 	if (!pub->state) {
 | |
| 		ao2_ref(pub, -1);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	return pub;
 | |
| }
 | |
| 
 | |
| const char *stasis_state_publisher_id(const struct stasis_state_publisher *pub)
 | |
| {
 | |
| 	return pub->state->id;
 | |
| }
 | |
| 
 | |
| struct stasis_topic *stasis_state_publisher_topic(struct stasis_state_publisher *pub)
 | |
| {
 | |
| 	return pub->state->topic;
 | |
| }
 | |
| 
 | |
| void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
 | |
| {
 | |
| 	ao2_lock(pub->state);
 | |
| 	ao2_replace(pub->state->msg, msg);
 | |
| 	ao2_unlock(pub->state);
 | |
| 
 | |
| 	stasis_publish(pub->state->topic, msg);
 | |
| }
 | |
| 
 | |
| /*!
 | |
|  * \internal
 | |
|  * \brief Find, or add the given eid to the state object
 | |
|  *
 | |
|  * Publishers can be tracked implicitly using eids. This allows us to add, and subsequently
 | |
|  * remove state objects from the managed states container in a deterministic way. Using the
 | |
|  * eids in this way is possible because it's guaranteed that there will only ever be a single
 | |
|  * publisher for a uniquely named topic (topics tracked by this module) on a system.
 | |
|  *
 | |
|  * \note The vector does not use locking. Instead we use the state object for that, so it
 | |
|  * needs to be locked prior to calling this method.
 | |
|  *
 | |
|  * \param state The state object
 | |
|  * \param eid The system id to add to the state object
 | |
|  */
 | |
| static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)
 | |
| {
 | |
| 	size_t i;
 | |
| 
 | |
| 	if (!eid) {
 | |
| 		eid = &ast_eid_default;
 | |
| 	}
 | |
| 
 | |
| 	for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
 | |
| 		if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
 | |
| 			break;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (i == AST_VECTOR_SIZE(&state->eids)) {
 | |
| 		if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
 | |
| 			/* This ensures state cannot be freed if it has any eids */
 | |
| 			ao2_ref(state, +1);
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*!
 | |
|  * \internal
 | |
|  * \brief Find, and remove the given eid from the state object
 | |
|  *
 | |
|  * Used to remove an eid from an implicit publisher.
 | |
|  *
 | |
|  * \note The vector does not use locking. Instead we use the state object for that, so it
 | |
|  * needs to be locked prior to calling this method.
 | |
|  *
 | |
|  * \param state The state object
 | |
|  * \param eid The system id to remove from the state object
 | |
|  */
 | |
| static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
 | |
| {
 | |
| 	size_t i;
 | |
| 
 | |
| 	if (!eid) {
 | |
| 		eid = &ast_eid_default;
 | |
| 	}
 | |
| 
 | |
| 	for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
 | |
| 		if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
 | |
| 			AST_VECTOR_REMOVE_UNORDERED(&state->eids, i);
 | |
| 			/* Balance the reference from state_find_or_add_eid */
 | |
| 			ao2_ref(state, -1);
 | |
| 			return;
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id,
 | |
| 	const struct ast_eid *eid, struct stasis_message *msg)
 | |
| {
 | |
| 	struct stasis_state *state;
 | |
| 
 | |
| 	state = state_find_or_add(manager, NULL, id);
 | |
| 	if (!state) {
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	ao2_lock(state);
 | |
| 	state_find_or_add_eid(state, eid);
 | |
| 	ao2_replace(state->msg, msg);
 | |
| 	ao2_unlock(state);
 | |
| 
 | |
| 	stasis_publish(state->topic, msg);
 | |
| 
 | |
| 	ao2_ref(state, -1);
 | |
| }
 | |
| 
 | |
| void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
 | |
| 	const char *id, const struct ast_eid *eid, struct stasis_message *msg)
 | |
| {
 | |
| 	struct stasis_state *state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY, "");
 | |
| 
 | |
| 	if (!state) {
 | |
| 		/*
 | |
| 		 * In most circumstances state should already exist here. However, if there is no
 | |
| 		 * state then it can mean one of a few things:
 | |
| 		 *
 | |
| 		 * 1. This function was called prior to an implicit publish for the same given
 | |
| 		 *    manager, and id.
 | |
| 		 * 2. This function was called more than once for the same manager, and id.
 | |
| 		 * 3. There is ref count problem with the explicit subscribers, and publishers.
 | |
| 		 */
 | |
| 		ast_debug(5, "Attempted to remove state for id '%s', but state not found\n", id);
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	if (msg) {
 | |
| 		stasis_publish(state->topic, msg);
 | |
| 	}
 | |
| 
 | |
| 	ao2_lock(state);
 | |
| 	state_find_and_remove_eid(state, eid);
 | |
| 	ao2_unlock(state);
 | |
| 
 | |
| 	ao2_ref(state, -1);
 | |
| }
 | |
| 
 | |
| int stasis_state_add_observer(struct stasis_state_manager *manager,
 | |
| 	struct stasis_state_observer *observer)
 | |
| {
 | |
| 	int res;
 | |
| 
 | |
| 	AST_VECTOR_RW_WRLOCK(&manager->observers);
 | |
| 	res = AST_VECTOR_APPEND(&manager->observers, observer);
 | |
| 	AST_VECTOR_RW_UNLOCK(&manager->observers);
 | |
| 
 | |
| 	return res;
 | |
| }
 | |
| 
 | |
| void stasis_state_remove_observer(struct stasis_state_manager *manager,
 | |
| 	struct stasis_state_observer *observer)
 | |
| {
 | |
| 	AST_VECTOR_RW_WRLOCK(&manager->observers);
 | |
| 	AST_VECTOR_REMOVE_ELEM_UNORDERED(&manager->observers, observer, AST_VECTOR_ELEM_CLEANUP_NOOP);
 | |
| 	AST_VECTOR_RW_UNLOCK(&manager->observers);
 | |
| }
 | |
| 
 | |
| static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
 | |
| {
 | |
| 	struct stasis_message *msg;
 | |
| 	int res;
 | |
| 
 | |
| 	/*
 | |
| 	 * State needs to be locked here while we retrieve and bump the reference on its message
 | |
| 	 * object. Doing so guarantees the message object will live throughout its handling.
 | |
| 	 */
 | |
| 	ao2_lock(state);
 | |
| 	msg = ao2_bump(state->msg);
 | |
| 	ao2_unlock(state);
 | |
| 
 | |
| 	res = handler(state->id, msg, data);
 | |
| 	ao2_cleanup(msg);
 | |
| 	return res;
 | |
| }
 | |
| 
 | |
| static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
 | |
| {
 | |
| 	struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
 | |
| 
 | |
| 	if (state) {
 | |
| 		int res;
 | |
| 		res = handle_stasis_state(state, arg, data);
 | |
| 		ao2_ref(state, -1);
 | |
| 		return res;
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler,
 | |
| 	void *data)
 | |
| {
 | |
| 	ast_assert(handler != NULL);
 | |
| 
 | |
| 	ao2_callback_data(manager->states, OBJ_MULTIPLE | OBJ_NODATA,
 | |
| 		handle_stasis_state_proxy, handler, data);
 | |
| }
 | |
| 
 | |
| static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
 | |
| {
 | |
| 	struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
 | |
| 	int res = 0;
 | |
| 
 | |
| 	if (state && state->num_subscribers) {
 | |
| 		res = handle_stasis_state(state, arg, data);
 | |
| 	}
 | |
| 
 | |
| 	ao2_cleanup(state);
 | |
| 
 | |
| 	return res;
 | |
| }
 | |
| 
 | |
| void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler,
 | |
| 	void *data)
 | |
| {
 | |
| 	ast_assert(handler != NULL);
 | |
| 
 | |
| 	ao2_callback_data(manager->states, OBJ_MULTIPLE | OBJ_NODATA,
 | |
| 		handle_stasis_state_subscribed, handler, data);
 | |
| }
 |