mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-25 22:18:07 +00:00 
			
		
		
		
	Case scenario with Applications ARI: * Once you subscribe to deviceState with Applications REST API, it will be added into subscription pool. * When you unsubscribe it will remove from the device_state_subscription hash table but not from the subscription pool. * When you subscribe again, it will add it to pool again. * Now you will have two subscriptions and you will receive same event twice. This fix should now remove deviceState subscription from pool and it should fix unsubscribe on deviceState. ASTERISK-27130 #close Change-Id: I718b70d770a086e39b4ddba4f69a3c616d4476c4
		
			
				
	
	
		
			488 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			488 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  * Asterisk -- An open source telephony toolkit.
 | |
|  *
 | |
|  * Copyright (C) 2013, Digium, Inc.
 | |
|  *
 | |
|  * Kevin Harwell <kharwell@digium.com>
 | |
|  *
 | |
|  * See http://www.asterisk.org for more information about
 | |
|  * the Asterisk project. Please do not directly contact
 | |
|  * any of the maintainers of this project for assistance;
 | |
|  * the project provides a web site, mailing lists and IRC
 | |
|  * channels for your use.
 | |
|  *
 | |
|  * This program is free software, distributed under the terms of
 | |
|  * the GNU General Public License Version 2. See the LICENSE file
 | |
|  * at the top of the source tree.
 | |
|  */
 | |
| 
 | |
| /*** MODULEINFO
 | |
| 	<depend type="module">res_stasis</depend>
 | |
| 	<support_level>core</support_level>
 | |
|  ***/
 | |
| 
 | |
| #include "asterisk.h"
 | |
| 
 | |
| ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 | |
| 
 | |
| #include "asterisk/astdb.h"
 | |
| #include "asterisk/astobj2.h"
 | |
| #include "asterisk/module.h"
 | |
| #include "asterisk/stasis_app_impl.h"
 | |
| #include "asterisk/stasis_app_device_state.h"
 | |
| 
 | |
| #define DEVICE_STATE_SIZE 64
 | |
| /*! astdb family name */
 | |
| #define DEVICE_STATE_FAMILY "StasisDeviceState"
 | |
| /*! Stasis device state provider */
 | |
| #define DEVICE_STATE_PROVIDER_STASIS "Stasis"
 | |
| /*! Scheme for custom device states */
 | |
| #define DEVICE_STATE_SCHEME_STASIS "Stasis:"
 | |
| /*! Scheme for device state subscriptions */
 | |
| #define DEVICE_STATE_SCHEME_SUB "deviceState:"
 | |
| 
 | |
| /*! Number of hash buckets for device state subscriptions */
 | |
| #define DEVICE_STATE_BUCKETS 37
 | |
| 
 | |
| /*! The key used for tracking a subscription to all device states */
 | |
| #define DEVICE_STATE_ALL "__AST_DEVICE_STATE_ALL_TOPIC"
 | |
| 
 | |
| /*! Container for subscribed device states */
 | |
| static struct ao2_container *device_state_subscriptions;
 | |
| 
 | |
| /*!
 | |
|  * \brief Device state subscription object.
 | |
|  */
 | |
| struct device_state_subscription {
 | |
| 	AST_DECLARE_STRING_FIELDS(
 | |
| 		AST_STRING_FIELD(app_name);
 | |
| 		AST_STRING_FIELD(device_name);
 | |
| 	);
 | |
| 	/*! The subscription object */
 | |
| 	struct stasis_subscription *sub;
 | |
| };
 | |
| 
 | |
| static int device_state_subscriptions_hash(const void *obj, const int flags)
 | |
| {
 | |
| 	const struct device_state_subscription *object;
 | |
| 
 | |
| 	switch (flags & OBJ_SEARCH_MASK) {
 | |
| 	case OBJ_SEARCH_OBJECT:
 | |
| 		object = obj;
 | |
| 		return ast_str_hash(object->device_name);
 | |
| 	case OBJ_SEARCH_KEY:
 | |
| 	default:
 | |
| 		/* Hash can only work on something with a full key. */
 | |
| 		ast_assert(0);
 | |
| 		return 0;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| static int device_state_subscriptions_cmp(void *obj, void *arg, int flags)
 | |
| {
 | |
| 	const struct device_state_subscription *object_left = obj;
 | |
| 	const struct device_state_subscription *object_right = arg;
 | |
| 	int cmp;
 | |
| 
 | |
| 	switch (flags & OBJ_SEARCH_MASK) {
 | |
| 	case OBJ_SEARCH_OBJECT:
 | |
| 		/* find objects matching both device and app names */
 | |
| 		if (strcmp(object_left->device_name,
 | |
| 			   object_right->device_name)) {
 | |
| 			return 0;
 | |
| 		}
 | |
| 		cmp = strcmp(object_left->app_name, object_right->app_name);
 | |
| 		break;
 | |
| 	case OBJ_SEARCH_KEY:
 | |
| 	case OBJ_SEARCH_PARTIAL_KEY:
 | |
| 		ast_assert(0); /* not supported by container */
 | |
| 		/* fall through */
 | |
| 	default:
 | |
| 		cmp = 0;
 | |
| 		break;
 | |
| 	}
 | |
| 
 | |
| 	return cmp ? 0 : CMP_MATCH | CMP_STOP;
 | |
| }
 | |
| 
 | |
| static void device_state_subscription_destroy(void *obj)
 | |
| {
 | |
| 	struct device_state_subscription *sub = obj;
 | |
| 	ast_string_field_free_memory(sub);
 | |
| }
 | |
| 
 | |
| static struct device_state_subscription *device_state_subscription_create(
 | |
| 	const struct stasis_app *app, const char *device_name)
 | |
| {
 | |
| 	struct device_state_subscription *sub;
 | |
| 	const char *app_name = stasis_app_name(app);
 | |
| 	size_t size;
 | |
| 
 | |
| 	if (ast_strlen_zero(device_name)) {
 | |
| 		device_name = DEVICE_STATE_ALL;
 | |
| 	}
 | |
| 
 | |
| 	size = strlen(device_name) + strlen(app_name) + 2;
 | |
| 
 | |
|  	sub = ao2_alloc(sizeof(*sub), device_state_subscription_destroy);
 | |
| 	if (!sub) {
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	if (ast_string_field_init(sub, size)) {
 | |
| 		ao2_ref(sub, -1);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	ast_string_field_set(sub, app_name, app_name);
 | |
| 	ast_string_field_set(sub, device_name, device_name);
 | |
| 	return sub;
 | |
| }
 | |
| 
 | |
| static struct device_state_subscription *find_device_state_subscription(
 | |
| 	struct stasis_app *app, const char *name)
 | |
| {
 | |
| 	struct device_state_subscription dummy_sub = {
 | |
| 		.app_name = stasis_app_name(app),
 | |
| 		.device_name = name
 | |
| 	};
 | |
| 
 | |
| 	return ao2_find(device_state_subscriptions, &dummy_sub, OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
 | |
| }
 | |
| 
 | |
| static void remove_device_state_subscription(
 | |
| 	struct device_state_subscription *sub)
 | |
| {
 | |
| 	if (sub->sub) {
 | |
| 		sub->sub = stasis_unsubscribe_and_join(sub->sub);
 | |
| 	}
 | |
| 	ao2_unlink_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
 | |
| }
 | |
| 
 | |
| struct ast_json *stasis_app_device_state_to_json(
 | |
| 	const char *name, enum ast_device_state state)
 | |
| {
 | |
| 	return ast_json_pack("{s: s, s: s}",
 | |
| 			     "name", name,
 | |
| 			     "state", ast_devstate_str(state));
 | |
| }
 | |
| 
 | |
| struct ast_json *stasis_app_device_states_to_json(void)
 | |
| {
 | |
| 	struct ast_json *array = ast_json_array_create();
 | |
| 	RAII_VAR(struct ast_db_entry *, tree,
 | |
| 		 ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
 | |
| 	struct ast_db_entry *entry;
 | |
| 
 | |
| 	for (entry = tree; entry; entry = entry->next) {
 | |
| 		const char *name = strrchr(entry->key, '/');
 | |
| 		if (!ast_strlen_zero(name)) {
 | |
| 			struct ast_str *device = ast_str_alloca(DEVICE_STATE_SIZE);
 | |
| 			ast_str_set(&device, 0, "%s%s",
 | |
| 				    DEVICE_STATE_SCHEME_STASIS, ++name);
 | |
| 			ast_json_array_append(
 | |
| 				array, stasis_app_device_state_to_json(
 | |
| 					ast_str_buffer(device),
 | |
| 					ast_device_state(ast_str_buffer(device))));
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return array;
 | |
| }
 | |
| 
 | |
| static void send_device_state(struct device_state_subscription *sub,
 | |
| 			      const char *name, enum ast_device_state state)
 | |
| {
 | |
| 	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
 | |
| 
 | |
| 	json = ast_json_pack("{s:s, s:s, s:o, s:o}",
 | |
| 			     "type", "DeviceStateChanged",
 | |
| 			     "application", sub->app_name,
 | |
| 			     "timestamp", ast_json_timeval(ast_tvnow(), NULL),
 | |
| 			     "device_state", stasis_app_device_state_to_json(
 | |
| 				     name, state));
 | |
| 
 | |
| 	if (!json) {
 | |
| 		ast_log(LOG_ERROR, "Unable to create device state json object\n");
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	stasis_app_send(sub->app_name, json);
 | |
| }
 | |
| 
 | |
| enum stasis_device_state_result stasis_app_device_state_update(
 | |
| 	const char *name, const char *value)
 | |
| {
 | |
| 	size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
 | |
| 	enum ast_device_state state;
 | |
| 
 | |
| 	ast_debug(3, "Updating device name = %s, value = %s", name, value);
 | |
| 
 | |
| 	if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
 | |
| 		ast_log(LOG_ERROR, "Update can only be used to set "
 | |
| 			"'%s' device state!\n", DEVICE_STATE_SCHEME_STASIS);
 | |
| 		return STASIS_DEVICE_STATE_NOT_CONTROLLED;
 | |
| 	}
 | |
| 
 | |
| 	name += size;
 | |
| 	if (ast_strlen_zero(name)) {
 | |
| 		ast_log(LOG_ERROR, "Update requires custom device name!\n");
 | |
| 		return STASIS_DEVICE_STATE_MISSING;
 | |
| 	}
 | |
| 
 | |
| 	if (!value || (state = ast_devstate_val(value)) == AST_DEVICE_UNKNOWN) {
 | |
| 		ast_log(LOG_ERROR, "Unknown device state "
 | |
| 			"value '%s'\n", value);
 | |
| 		return STASIS_DEVICE_STATE_UNKNOWN;
 | |
| 	}
 | |
| 
 | |
| 	ast_db_put(DEVICE_STATE_FAMILY, name, value);
 | |
| 	ast_devstate_changed(state, AST_DEVSTATE_CACHABLE, "%s%s",
 | |
| 			     DEVICE_STATE_SCHEME_STASIS, name);
 | |
| 
 | |
| 	return STASIS_DEVICE_STATE_OK;
 | |
| }
 | |
| 
 | |
| enum stasis_device_state_result stasis_app_device_state_delete(const char *name)
 | |
| {
 | |
| 	const char *full_name = name;
 | |
| 	size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
 | |
| 
 | |
| 	if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
 | |
| 		ast_log(LOG_ERROR, "Can only delete '%s' device states!\n",
 | |
| 			DEVICE_STATE_SCHEME_STASIS);
 | |
| 		return STASIS_DEVICE_STATE_NOT_CONTROLLED;
 | |
| 	}
 | |
| 
 | |
| 	name += size;
 | |
| 	if (ast_strlen_zero(name)) {
 | |
| 		ast_log(LOG_ERROR, "Delete requires a device name!\n");
 | |
| 		return STASIS_DEVICE_STATE_MISSING;
 | |
| 	}
 | |
| 
 | |
| 	if (ast_device_state_clear_cache(full_name)) {
 | |
| 		return STASIS_DEVICE_STATE_UNKNOWN;
 | |
| 	}
 | |
| 
 | |
| 	ast_db_del(DEVICE_STATE_FAMILY, name);
 | |
| 
 | |
| 	/* send state change for delete */
 | |
| 	ast_devstate_changed(
 | |
| 		AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "%s%s",
 | |
| 		DEVICE_STATE_SCHEME_STASIS, name);
 | |
| 
 | |
| 	return STASIS_DEVICE_STATE_OK;
 | |
| }
 | |
| 
 | |
| static void populate_cache(void)
 | |
| {
 | |
| 	RAII_VAR(struct ast_db_entry *, tree,
 | |
| 		 ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
 | |
| 	struct ast_db_entry *entry;
 | |
| 
 | |
| 	for (entry = tree; entry; entry = entry->next) {
 | |
| 		const char *name = strrchr(entry->key, '/');
 | |
| 		if (!ast_strlen_zero(name)) {
 | |
| 			ast_devstate_changed(
 | |
| 				ast_devstate_val(entry->data),
 | |
| 				AST_DEVSTATE_CACHABLE, "%s%s\n",
 | |
| 				DEVICE_STATE_SCHEME_STASIS, name + 1);
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| static enum ast_device_state stasis_device_state_cb(const char *data)
 | |
| {
 | |
| 	char buf[DEVICE_STATE_SIZE] = "";
 | |
| 
 | |
| 	ast_db_get(DEVICE_STATE_FAMILY, data, buf, sizeof(buf));
 | |
| 
 | |
| 	return ast_devstate_val(buf);
 | |
| }
 | |
| 
 | |
| static void device_state_cb(void *data, struct stasis_subscription *sub,
 | |
| 			    struct stasis_message *msg)
 | |
| {
 | |
| 	struct ast_device_state_message *device_state;
 | |
| 
 | |
| 	if (stasis_subscription_final_message(sub, msg)) {
 | |
| 		/* Remove stasis subscription's reference to device_state_subscription */
 | |
| 		ao2_ref(data, -1);
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	if (ast_device_state_message_type() != stasis_message_type(msg)) {
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	device_state = stasis_message_data(msg);
 | |
| 	if (device_state->eid) {
 | |
| 		/* ignore non-aggregate states */
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	send_device_state(data, device_state->device, device_state->state);
 | |
| }
 | |
| 
 | |
| static void *find_device_state(const struct stasis_app *app, const char *name)
 | |
| {
 | |
| 	return device_state_subscription_create(app, name);
 | |
| }
 | |
| 
 | |
| static int is_subscribed_device_state(struct stasis_app *app, const char *name)
 | |
| {
 | |
| 	struct device_state_subscription *sub;
 | |
| 
 | |
| 	sub = find_device_state_subscription(app, DEVICE_STATE_ALL);
 | |
| 	if (sub) {
 | |
| 		ao2_ref(sub, -1);
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	sub = find_device_state_subscription(app, name);
 | |
| 	if (sub) {
 | |
| 		ao2_ref(sub, -1);
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static int is_subscribed_device_state_lock(struct stasis_app *app, const char *name)
 | |
| {
 | |
| 	int is_subscribed;
 | |
| 
 | |
| 	ao2_lock(device_state_subscriptions);
 | |
| 	is_subscribed = is_subscribed_device_state(app, name);
 | |
| 	ao2_unlock(device_state_subscriptions);
 | |
| 
 | |
| 	return is_subscribed;
 | |
| }
 | |
| 
 | |
| static int subscribe_device_state(struct stasis_app *app, void *obj)
 | |
| {
 | |
| 	struct device_state_subscription *sub = obj;
 | |
| 	struct stasis_topic *topic;
 | |
| 
 | |
| 	if (!sub) {
 | |
| 		sub = device_state_subscription_create(app, NULL);
 | |
| 		if (!sub) {
 | |
| 			return -1;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (strcmp(sub->device_name, DEVICE_STATE_ALL)) {
 | |
| 		topic = ast_device_state_topic(sub->device_name);
 | |
| 	} else {
 | |
| 		topic = ast_device_state_topic_all();
 | |
| 	}
 | |
| 
 | |
| 	ao2_lock(device_state_subscriptions);
 | |
| 
 | |
| 	if (is_subscribed_device_state(app, sub->device_name)) {
 | |
| 		ao2_unlock(device_state_subscriptions);
 | |
| 		ast_debug(3, "App %s is already subscribed to %s\n", stasis_app_name(app), sub->device_name);
 | |
| 		return 0;
 | |
| 	}
 | |
| 
 | |
| 	ast_debug(3, "Subscribing to device %s\n", sub->device_name);
 | |
| 
 | |
| 	sub->sub = stasis_subscribe_pool(topic, device_state_cb, ao2_bump(sub));
 | |
| 	if (!sub->sub) {
 | |
| 		ao2_unlock(device_state_subscriptions);
 | |
| 		ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
 | |
| 			sub->device_name);
 | |
| 		/* Reference we added when attempting to stasis_subscribe_pool */
 | |
| 		ao2_ref(sub, -1);
 | |
| 		return -1;
 | |
| 	}
 | |
| 
 | |
| 	ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
 | |
| 	ao2_unlock(device_state_subscriptions);
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static int unsubscribe_device_state(struct stasis_app *app, const char *name)
 | |
| {
 | |
| 	struct device_state_subscription *sub;
 | |
| 
 | |
| 	ao2_lock(device_state_subscriptions);
 | |
| 	sub = find_device_state_subscription(app, name);
 | |
| 	if (sub) {
 | |
| 		remove_device_state_subscription(sub);
 | |
| 	}
 | |
| 	ao2_unlock(device_state_subscriptions);
 | |
| 
 | |
| 	ao2_cleanup(sub);
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static int device_to_json_cb(void *obj, void *arg, void *data, int flags)
 | |
| {
 | |
| 	struct device_state_subscription *sub = obj;
 | |
| 	const char *app_name = arg;
 | |
| 	struct ast_json *array = data;
 | |
| 
 | |
| 	if (strcmp(sub->app_name, app_name)) {
 | |
| 		return 0;
 | |
| 	}
 | |
| 
 | |
| 	ast_json_array_append(
 | |
| 		array, ast_json_string_create(sub->device_name));
 | |
| 	return 0;
 | |
| 
 | |
| }
 | |
| 
 | |
| static void devices_to_json(const struct stasis_app *app, struct ast_json *json)
 | |
| {
 | |
| 	struct ast_json *array = ast_json_array_create();
 | |
| 	ao2_callback_data(device_state_subscriptions, OBJ_NODATA,
 | |
| 			  device_to_json_cb, (void *)stasis_app_name(app), array);
 | |
| 	ast_json_object_set(json, "device_names", array);
 | |
| }
 | |
| 
 | |
| struct stasis_app_event_source device_state_event_source = {
 | |
| 	.scheme = DEVICE_STATE_SCHEME_SUB,
 | |
| 	.find = find_device_state,
 | |
| 	.subscribe = subscribe_device_state,
 | |
| 	.unsubscribe = unsubscribe_device_state,
 | |
| 	.is_subscribed = is_subscribed_device_state_lock,
 | |
| 	.to_json = devices_to_json
 | |
| };
 | |
| 
 | |
| static int load_module(void)
 | |
| {
 | |
| 	populate_cache();
 | |
| 	if (ast_devstate_prov_add(DEVICE_STATE_PROVIDER_STASIS,
 | |
| 				  stasis_device_state_cb)) {
 | |
| 		return AST_MODULE_LOAD_DECLINE;
 | |
| 	}
 | |
| 
 | |
| 	if (!(device_state_subscriptions = ao2_container_alloc(
 | |
| 		      DEVICE_STATE_BUCKETS, device_state_subscriptions_hash,
 | |
| 		      device_state_subscriptions_cmp))) {
 | |
| 		ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
 | |
| 		return AST_MODULE_LOAD_DECLINE;
 | |
| 	}
 | |
| 
 | |
| 	stasis_app_register_event_source(&device_state_event_source);
 | |
| 	return AST_MODULE_LOAD_SUCCESS;
 | |
| }
 | |
| 
 | |
| static int unload_module(void)
 | |
| {
 | |
| 	ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
 | |
| 	stasis_app_unregister_event_source(&device_state_event_source);
 | |
| 	ao2_cleanup(device_state_subscriptions);
 | |
| 	device_state_subscriptions = NULL;
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application device state support",
 | |
| 	.support_level = AST_MODULE_SUPPORT_CORE,
 | |
| 	.load = load_module,
 | |
| 	.unload = unload_module,
 | |
| 	.nonoptreq = "res_stasis");
 |