FS-10167: Updated publish and broadcast to support adding and removing channels at runtime, as well as removing protocols at runtime. In effect this provides cleanup on subscriptions when an entire channel or protocol becomes unavailable. Committing to test linux build.

This commit is contained in:
Shane Bryldt 2017-08-18 16:30:08 -06:00
parent 079a04d1c2
commit a81b57bd35
15 changed files with 578 additions and 161 deletions

View File

@ -122,8 +122,12 @@ KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const ch
ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
if (blade_protocol_purge(bp, nodeid)) {
if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
ks_hash_insert(cleanup, (void *)key, bp);
if (!blade_protocol_controller_available(bp)) {
if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
ks_hash_insert(cleanup, (void *)key, bp);
} else {
// @todo not the last controller, may need to propagate that the controller is no longer available?
}
}
}
if (cleanup) {
@ -133,6 +137,8 @@ KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const ch
ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
blade_subscriptionmgr_broadcast(blade_handle_subscriptionmgr_get(bmmgr->handle), BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE, NULL, blade_protocol_name_get(bp), blade_protocol_realm_get(bp), NULL, NULL, NULL, NULL, NULL);
ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
ks_hash_remove(bmmgr->protocols, (void *)key);
}
@ -192,12 +198,49 @@ KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr,
ks_hash_insert(bmmgr->protocols, (void *)ks_pstrdup(pool, key), bp);
}
blade_protocol_controllers_add(bp, controller);
blade_protocol_controller_add(bp, controller);
ks_hash_write_unlock(bmmgr->protocols);
ks_pool_free(&key);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller)
{
ks_pool_t *pool = NULL;
blade_protocol_t *bp = NULL;
char *key = NULL;
ks_assert(bmmgr);
ks_assert(protocol);
ks_assert(realm);
ks_assert(controller);
pool = ks_pool_get(bmmgr);
key = ks_psprintf(pool, "%s@%s", protocol, realm);
ks_hash_write_lock(bmmgr->protocols);
bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_UNLOCKED);
if (bp) {
if (blade_protocol_controller_remove(bp, controller)) {
if (!blade_protocol_controller_available(bp)) {
// @todo broadcast protocol removal to remove all channel subscriptions
ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
ks_hash_remove(bmmgr->protocols, (void *)key);
} else {
// @todo not the last controller, may need to propagate when a specific controller becomes unavailable though
}
}
}
ks_hash_write_unlock(bmmgr->protocols);
ks_pool_free(&key);
return KS_STATUS_SUCCESS;
}
@ -249,7 +292,9 @@ KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr,
goto done;
}
blade_protocol_channel_remove(bp, channel);
if (blade_protocol_channel_remove(bp, channel)) {
blade_subscriptionmgr_broadcast(blade_handle_subscriptionmgr_get(bmmgr->handle), BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE, NULL, blade_protocol_name_get(bp), blade_protocol_realm_get(bp), channel, NULL, NULL, NULL, NULL);
}
done:
ks_pool_free(&key);

View File

@ -98,10 +98,20 @@ KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(const char *) blade_protocol_name_get(blade_protocol_t *bp)
{
ks_assert(bp);
return bp->name;
}
KS_DECLARE(const char *) blade_protocol_realm_get(blade_protocol_t *bp)
{
ks_assert(bp);
return bp->realm;
}
KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
{
ks_bool_t ret = KS_FALSE;
ks_assert(bp);
ks_assert(nodeid);
@ -119,22 +129,16 @@ KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nod
}
ks_hash_write_unlock(bp->channels);
ks_hash_write_lock(bp->controllers);
if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
}
ret = ks_hash_count(bp->controllers) == 0;
ks_hash_write_unlock(bp->controllers);
return ret;
return blade_protocol_controller_remove(bp, nodeid);
}
KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
KS_DECLARE(cJSON *) blade_protocol_controller_pack(blade_protocol_t *bp)
{
cJSON *controllers = cJSON_CreateObject();
ks_assert(bp);
ks_hash_read_lock(bp->controllers);
for (ks_hash_iterator_t *it = ks_hash_first(bp->controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
void *value = NULL;
@ -143,11 +147,12 @@ KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
cJSON_AddItemToArray(controllers, cJSON_CreateString(key));
}
ks_hash_read_unlock(bp->controllers);
return controllers;
}
KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid)
KS_DECLARE(ks_status_t) blade_protocol_controller_add(blade_protocol_t *bp, const char *nodeid)
{
char *key = NULL;
@ -162,6 +167,29 @@ KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, con
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_bool_t) blade_protocol_controller_remove(blade_protocol_t *bp, const char *nodeid)
{
ks_bool_t ret = KS_FALSE;
ks_assert(bp);
ks_assert(nodeid);
ks_hash_write_lock(bp->controllers);
if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
ret = KS_TRUE;
ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
}
ks_hash_write_unlock(bp->controllers);
return ret;
}
KS_DECLARE(ks_bool_t) blade_protocol_controller_available(blade_protocol_t *bp)
{
ks_assert(bp);
return ks_hash_count(bp->controllers) > 0;
}
KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name)
{
ks_status_t ret = KS_STATUS_SUCCESS;
@ -195,16 +223,23 @@ done:
return ret;
}
KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
KS_DECLARE(ks_bool_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
{
ks_bool_t ret = KS_FALSE;
ks_hash_t *authorized = NULL;
ks_assert(bp);
ks_assert(name);
ks_hash_remove(bp->channels, (void *)name);
ks_hash_write_lock(bp->channels);
if ((authorized = ks_hash_remove(bp->channels, (void *)name))) {
ret = KS_TRUE;
ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
ks_hash_destroy(&authorized);
}
ks_hash_write_unlock(bp->channels);
ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
return KS_STATUS_SUCCESS;
return ret;
}
KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target)
@ -233,8 +268,7 @@ KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, k
if (ks_hash_remove(authorizations, (void *)target)) {
ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", target, bp->name, bp->realm, channel);
} else ret = KS_STATUS_NOT_FOUND;
}
else {
} else {
ks_hash_insert(authorizations, (void *)ks_pstrdup(ks_pool_get(bp), target), (void *)KS_TRUE);
ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Added: %s to %s@%s/%s\n", target, bp->name, bp->realm, channel);
}

View File

@ -352,7 +352,7 @@ struct blade_rpcsubscribe_data_s {
const char *relayed_messageid;
};
ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, blade_rpcsubscribe_command_t command, const char *protocol, const char *realm, cJSON *channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
// blade.register request generator
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
@ -456,7 +456,7 @@ done:
// blade.publish request generator
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpublish_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -478,9 +478,29 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
pool = ks_pool_get(bh);
ks_assert(pool);
// @todo validate command and parameters
switch (command) {
case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
break;
case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
break;
case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
if (!channels || cJSON_GetArraySize(channels) <= 0) {
ret = KS_STATUS_ARG_NULL;
goto done;
}
break;
case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE:
break;
default:
ret = KS_STATUS_ARG_INVALID;
goto done;
}
// create the response
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.publish");
// fill in the req_params
cJSON_AddNumberToObject(req_params, "command", command);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
@ -496,10 +516,6 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
cJSON_AddStringToObject(req_params, "responder-nodeid", id);
ks_pool_free(&id);
// @todo may want to switch this system to use a blade_rpcpublish_args_t with validation on the contents on this list internally
// and to produce the entire json block internally in case the channel args change to include additional information like an encryption key,
// however if passing encryption keys then they should be asymetrically encrypted using a public key provided by the master so that
// the channel keys can be transmitted without intermediate nodes being able to snoop them
if (channels) cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
// @todo add a parameter containing a block of json for schema definitions for each of the methods being published
@ -523,6 +539,8 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
cJSON *req = NULL;
cJSON *req_params = NULL;
cJSON *req_params_channels = NULL;
cJSON *req_params_command = NULL;
blade_rpcpublish_command_t command = BLADE_RPCPUBLISH_COMMAND_NONE;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_requester_nodeid = NULL;
@ -549,6 +567,22 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
goto done;
}
req_params_command = cJSON_GetObjectItem(req_params, "command");
if (!req_params_command) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'command'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
command = (blade_rpcpublish_command_t)req_params_command->valueint;
switch (command) {
case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE: break;
default: goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'protocol'\n", blade_session_id_get(bs));
@ -590,9 +624,12 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
goto done;
}
// @todo get enumeration parameter to represent a publish command, including add_protocol, remove_protocol, and update_channels
// @todo switch channels to separate add_channels and remove_channels
req_params_channels = cJSON_GetObjectItem(req_params, "channels");
if (req_params_channels) {
int size = 0;
cJSON *element = NULL;
if (req_params_channels->type != cJSON_Array) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
@ -601,9 +638,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
goto done;
}
size = cJSON_GetArraySize(req_params_channels);
for (int index = 0; index < size; ++index) {
cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
cJSON_ArrayForEach(element, req_params_channels) {
if (element->type != cJSON_String) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
@ -615,16 +650,41 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
if (req_params_channels) {
int size = cJSON_GetArraySize(req_params_channels);
for (int index = 0; index < size; ++index) {
cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
// @todo switch on publish command, make the following code for add_protocol
switch (command) {
case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
if (req_params_channels) {
cJSON *element = NULL;
cJSON_ArrayForEach(element, req_params_channels) {
blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
}
}
break;
case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
blade_mastermgr_controller_remove(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
break;
case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
if (req_params_channels) {
cJSON *element = NULL;
cJSON_ArrayForEach(element, req_params_channels) {
blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
}
}
break;
case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE:
if (req_params_channels) {
cJSON *element = NULL;
cJSON_ArrayForEach(element, req_params_channels) {
blade_mastermgr_channel_remove(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
}
}
break;
default:
goto done;
}
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
@ -632,6 +692,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
// @todo include a list of channels that failed to be added or removed if applicable?
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
@ -855,7 +916,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
blade_session_send(bs, res, NULL, NULL);
if (res_result_unauthorized_channels) {
blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, NULL, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
blade_handle_rpcsubscribe_raw(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, req_params_protocol, req_params_realm, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
}
done:
@ -1000,7 +1061,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
bp = blade_mastermgr_protocol_lookup(bh->mastermgr, req_params_protocol, req_params_realm);
if (bp) res_result_controllers = blade_protocol_controllers_pack(bp);
if (bp) res_result_controllers = blade_protocol_controller_pack(bp);
// build the actual response finally
@ -1325,7 +1386,15 @@ static void blade_rpcsubscribe_data_cleanup(void *ptr, void *arg, ks_pool_cleanu
}
// blade.subscribe request generator
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data)
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh,
blade_rpcsubscribe_command_t command,
const char *protocol,
const char *realm,
cJSON *channels,
blade_rpc_response_callback_t callback,
void *data,
blade_rpc_request_callback_t channel_callback,
void *channel_data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_pool_t *pool = NULL;
@ -1336,7 +1405,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
ks_assert(bh);
ks_assert(protocol);
ks_assert(realm);
ks_assert(subscribe_channels || unsubscribe_channels);
ks_assert(channels);
pool = ks_pool_get(bh);
ks_assert(pool);
@ -1359,7 +1428,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
temp_data->channel_data = channel_data;
ks_pool_set_cleanup(temp_data, NULL, blade_rpcsubscribe_data_cleanup);
ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
ret = blade_handle_rpcsubscribe_raw(bh, command, protocol, realm, channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
ks_pool_free(&localid);
@ -1369,7 +1438,15 @@ done:
return ret;
}
ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data)
ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh,
blade_rpcsubscribe_command_t command,
const char *protocol,
const char *realm,
cJSON *channels,
const char *subscriber,
ks_bool_t downstream,
blade_rpc_response_callback_t callback,
blade_rpcsubscribe_data_t *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -1380,12 +1457,12 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protoc
ks_assert(bh);
ks_assert(protocol);
ks_assert(realm);
ks_assert(subscribe_channels || unsubscribe_channels);
ks_assert(channels);
ks_assert(subscriber);
if (downstream) {
// @note if a master is sending a downstream update, it may only use unsubscribe_channels, cannot force a subscription without a subscriber callback
if (subscribe_channels) {
// @note if a master is sending a downstream update, it may only remove subscriptions, it cannot force a subscription without the subscriber providing the callback
if (command != BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
ret = KS_STATUS_NOT_ALLOWED;
goto done;
}
@ -1402,22 +1479,21 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protoc
pool = ks_pool_get(bh);
ks_assert(pool);
if (unsubscribe_channels) {
if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, unsubscribe_channels) {
cJSON_ArrayForEach(channel, channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, protocol, realm, channel->valuestring, subscriber);
}
}
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
cJSON_AddNumberToObject(req_params, "command", command);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
cJSON_AddStringToObject(req_params, "subscriber-nodeid", subscriber);
if (downstream) cJSON_AddTrueToObject(req_params, "downstream");
if (subscribe_channels) cJSON_AddItemToObject(req_params, "subscribe-channels", cJSON_Duplicate(subscribe_channels, 1));
if (unsubscribe_channels) cJSON_AddItemToObject(req_params, "unsubscribe-channels", cJSON_Duplicate(unsubscribe_channels, 1));
cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
@ -1438,13 +1514,14 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
cJSON *req_params_command = NULL;
blade_rpcsubscribe_command_t command = BLADE_RPCSUBSCRIBE_COMMAND_NONE;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_subscriber_nodeid = NULL;
cJSON *req_params_downstream = NULL;
ks_bool_t downstream = KS_FALSE;
cJSON *req_params_subscribe_channels = NULL;
cJSON *req_params_unsubscribe_channels = NULL;
cJSON *req_params_channels = NULL;
ks_bool_t masterlocal = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
@ -1473,6 +1550,20 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
goto done;
}
req_params_command = cJSON_GetObjectItem(req_params, "command");
if (!req_params_command) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'command'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
command = (blade_rpcsubscribe_command_t)req_params_command->valueint;
switch (command) {
case BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD:
case BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE: break;
default: goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
@ -1502,12 +1593,11 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
req_params_downstream = cJSON_GetObjectItem(req_params, "downstream");
downstream = req_params_downstream && req_params_downstream->type == cJSON_True;
req_params_subscribe_channels = cJSON_GetObjectItem(req_params, "subscribe-channels");
req_params_unsubscribe_channels = cJSON_GetObjectItem(req_params, "unsubscribe-channels");
req_params_channels = cJSON_GetObjectItem(req_params, "channels");
if (!req_params_subscribe_channels && !req_params_unsubscribe_channels) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscribe-channels' or 'unsubscribe-channels'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscribe-channels or unsubscribe-channels");
if (!req_params_channels) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'channels'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
@ -1521,9 +1611,9 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
// @note This is normally handled by blade_handle_rpcsubscribe_raw() to ensure authorization removals are processed during the request path
// including on the node they start on, whether that is the master or the subscriber
if (req_params_unsubscribe_channels) {
if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
cJSON_ArrayForEach(channel, req_params_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
}
}
@ -1535,11 +1625,11 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
cJSON_AddStringToObject(res_result, "subscriber-nodeid", req_params_subscriber_nodeid);
if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
if (req_params_subscribe_channels) {
// @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request containing subscribe-channels
if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD) {
// @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request to add a subscriber
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, req_params_subscribe_channels) {
cJSON_ArrayForEach(channel, req_params_channels) {
if (blade_mastermgr_channel_verify(bh->mastermgr, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid)) {
blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
if (!res_result_subscribe_channels) res_result_subscribe_channels = cJSON_CreateArray();
@ -1563,7 +1653,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
temp_data->relayed_messageid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq));
ks_pool_set_cleanup(temp_data, NULL, blade_rpcsubscribe_data_cleanup);
blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
blade_handle_rpcsubscribe_raw(bh, command, req_params_protocol, req_params_realm, req_params_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
}
done:
@ -1694,11 +1784,10 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, NULL, protocol, realm, channel, event, params, callback, data);
ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL, protocol, realm, channel, event, params, callback, data);
// @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
// if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
@ -1707,6 +1796,8 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
return ret;
}
// @todo blade_handle_rpcbroadcast_raw() to encapsulate adding subcommands to broadcast to support protocol removal, protocol channel removal, and normal event broadcast
// blade.broadcast request handler
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
@ -1715,6 +1806,8 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
cJSON *req_params_command = NULL;
blade_rpcbroadcast_command_t command = BLADE_RPCBROADCAST_COMMAND_NONE;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_channel = NULL;
@ -1760,41 +1853,57 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
goto done;
}
req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
if (!req_params_channel) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
req_params_command = cJSON_GetObjectItem(req_params, "command");
if (!req_params_command) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'command'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_event = cJSON_GetObjectCstr(req_params, "event");
if (!req_params_event) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
blade_session_send(bs, res, NULL, NULL);
goto done;
command = (blade_rpcbroadcast_command_t)req_params_command->valueint;
switch (command) {
case BLADE_RPCBROADCAST_COMMAND_EVENT:
req_params_event = cJSON_GetObjectCstr(req_params, "event");
if (!req_params_event) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE:
req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
if (!req_params_channel) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
break;
case BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE: break;
default: goto done;
}
req_params_params = cJSON_GetObjectItem(req_params, "params");
blade_subscriptionmgr_broadcast(bh->subscriptionmgr, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
if (bsub) {
const char *localid = NULL;
ks_pool_t *pool = NULL;
if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) {
bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
if (bsub) {
const char *localid = NULL;
ks_pool_t *pool = NULL;
pool = ks_pool_get(bh);
pool = ks_pool_get(bh);
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
ks_assert(localid);
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
ks_assert(localid);
if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
callback = blade_subscription_callback_get(bsub);
if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
callback = blade_subscription_callback_get(bsub);
if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
}
ks_pool_free(&localid);
}
ks_pool_free(&localid);
}
// build the actual response finally
@ -1803,8 +1912,8 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
// @todo this is not neccessary, can obtain this from the original request
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "channel", req_params_channel);
cJSON_AddStringToObject(res_result, "event", req_params_event);
if (req_params_channel) cJSON_AddStringToObject(res_result, "channel", req_params_channel);
if (req_params_event) cJSON_AddStringToObject(res_result, "event", req_params_event);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);

View File

@ -145,6 +145,56 @@ KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(bla
return bsub;
}
KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel)
{
ks_pool_t *pool = NULL;
char *bsub_key = NULL;
blade_subscription_t *bsub = NULL;
ks_hash_t *subscribers = NULL;
ks_hash_t *subscriptions = NULL;
ks_assert(bsmgr);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
pool = ks_pool_get(bsmgr);
bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
ks_hash_write_lock(bsmgr->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
subscribers = blade_subscription_subscribers_get(bsub);
ks_assert(subscribers);
for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
subscriptions = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, key, KS_UNLOCKED);
ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", key, bsub_key);
ks_hash_remove(subscriptions, bsub_key);
if (ks_hash_count(subscriptions) == 0) {
ks_hash_remove(bsmgr->subscriptions_cleanup, key);
}
}
ks_log(KS_LOG_DEBUG, "Subscription Removed: %s\n", bsub_key);
ks_hash_remove(bsmgr->subscriptions, (void *)bsub_key);
ks_hash_write_unlock(bsmgr->subscriptions);
ks_pool_free(&bsub_key);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
{
ks_pool_t *pool = NULL;
@ -295,7 +345,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con
}
}
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
ks_pool_t *pool = NULL;
const char *bsub_key = NULL;
@ -303,68 +353,153 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_hash_t *routers = NULL;
ks_hash_t *channels = NULL;
ks_assert(bsmgr);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
pool = ks_pool_get(bsmgr);
bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
switch (command) {
case BLADE_RPCBROADCAST_COMMAND_EVENT:
ks_assert(event);
case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE:
ks_assert(channel);
bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.broadcast");
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
cJSON_AddStringToObject(req_params, "channel", channel);
cJSON_AddStringToObject(req_params, "event", event);
if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
ks_hash_read_lock(bsmgr->subscriptions);
ks_hash_read_lock(bsmgr->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
if (bsub) {
ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
if (bsub) {
ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
ks_assert(subscribers);
ks_assert(subscribers);
for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_hash_this(it, (const void **)&key, NULL, &value);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
// @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
if (bs) {
if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
if (!ks_hash_search(routers, blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, blade_session_id_get(bs), bs);
else blade_session_read_unlock(bs);
}
}
if (command == BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE) {
if (!channels) ks_hash_create(&channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
ks_hash_insert(channels, channel, (void *)KS_TRUE);
}
}
ks_hash_read_unlock(bsmgr->subscriptions);
break;
case BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE:
bsub_key = ks_psprintf(pool, "%s@%s", protocol, realm);
ks_hash_read_lock(bsmgr->subscriptions);
for (ks_hash_iterator_t *it = ks_hash_first(bsmgr->subscriptions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
bsub = (blade_subscription_t *)value;
// @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
if (ks_stristr(bsub_key, (const char *)key) == (const char *)key) {
ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
if (bs) {
ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
ks_assert(subscribers);
blade_session_send(bs, req, callback, data);
for (ks_hash_iterator_t *it2 = ks_hash_first(subscribers, KS_UNLOCKED); it2; it2 = ks_hash_next(&it2)) {
void *key2 = NULL;
void *value2 = NULL;
blade_session_read_unlock(bs);
ks_hash_this(it2, (const void **)&key2, NULL, &value2);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue;
// @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key2)) continue;
bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2);
if (bs) {
if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
if (!ks_hash_search(routers, blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, blade_session_id_get(bs), bs);
else blade_session_read_unlock(bs);
}
}
if (!channels) ks_hash_create(&channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
ks_hash_insert(channels, blade_subscription_channel_get(bsub), (void *)KS_TRUE);
}
}
ks_hash_read_unlock(bsmgr->subscriptions);
break;
default: return KS_STATUS_ARG_INVALID;
}
ks_hash_read_unlock(bsmgr->subscriptions);
bs = blade_upstreammgr_session_get(blade_handle_upstreammgr_get(bsmgr->handle));
if (bs) {
if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
ks_hash_insert(routers, blade_session_id_get(bs), bs);
}
else blade_session_read_unlock(bs);
}
if (channels) {
for (ks_hash_iterator_t *it = ks_hash_first(channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
blade_subscriptionmgr_subscription_remove(bsmgr, protocol, realm, (const char *)key);
}
ks_hash_destroy(&channels);
}
if (routers) {
for (ks_hash_iterator_t *it = ks_hash_first(routers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
bs = (blade_session_t *)value;
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.broadcast");
cJSON_AddNumberToObject(req_params, "command", command);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (channel) cJSON_AddStringToObject(req_params, "channel", channel);
if (event) cJSON_AddStringToObject(req_params, "event", event);
if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
blade_session_send(bs, req, callback, data);
cJSON_Delete(req);
blade_session_read_unlock(bs);
}
blade_session_read_unlock(bs);
ks_hash_destroy(&routers);
}
cJSON_Delete(req);
ks_pool_free(&bsub_key);
return KS_STATUS_SUCCESS;

View File

@ -42,6 +42,7 @@ KS_DECLARE(blade_handle_t *) blade_mastermgr_handle_get(blade_mastermgr_t *bmmgr
KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid);
KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target);

View File

@ -38,11 +38,15 @@
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm);
KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP);
KS_DECLARE(const char *) blade_protocol_name_get(blade_protocol_t *bp);
KS_DECLARE(const char *) blade_protocol_realm_get(blade_protocol_t *bp);
KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp);
KS_DECLARE(cJSON *) blade_protocol_controller_pack(blade_protocol_t *bp);
KS_DECLARE(ks_status_t) blade_protocol_controller_add(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(ks_bool_t) blade_protocol_controller_remove(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(ks_bool_t) blade_protocol_controller_available(blade_protocol_t *bp);
KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name);
KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
KS_DECLARE(ks_bool_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target);
KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target);
KS_END_EXTERN_C

View File

@ -60,7 +60,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpublish_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
@ -73,7 +73,7 @@ KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brp
KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, blade_rpcsubscribe_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);

View File

@ -40,10 +40,11 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_create(blade_subscriptionmgr_t **b
KS_DECLARE(ks_status_t) blade_subscriptionmgr_destroy(blade_subscriptionmgr_t **bsmgrP);
KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscriptionmgr_t *bsmgr);
KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_END_EXTERN_C
#endif

View File

@ -134,6 +134,28 @@ struct blade_transport_callbacks_s {
typedef void (*blade_session_callback_t)(blade_session_t *bs, blade_session_state_condition_t condition, void *data);
typedef enum {
BLADE_RPCPUBLISH_COMMAND_NONE,
BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD,
BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE,
BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD,
BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE,
} blade_rpcpublish_command_t;
typedef enum {
BLADE_RPCSUBSCRIBE_COMMAND_NONE,
BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD,
BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE,
} blade_rpcsubscribe_command_t;
typedef enum {
BLADE_RPCBROADCAST_COMMAND_NONE,
BLADE_RPCBROADCAST_COMMAND_EVENT,
BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE,
BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE,
} blade_rpcbroadcast_command_t;
KS_END_EXTERN_C
#endif

View File

@ -297,7 +297,7 @@ void command_subscribe(blade_handle_t *bh, char *args)
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD, "test", "mydomain.com", channels, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
cJSON_Delete(channels);
}

View File

@ -244,7 +244,7 @@ void command_publish(blade_handle_t *bh, char *args)
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
// @todo build up json-based method schema for each protocolrpc registered above, and pass into blade_handle_rpcpublish() to attach to the request, to be stored in the blade_protocol_t tracked by the master node
blade_handle_rpcpublish(bh, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
}
void command_broadcast(blade_handle_t *bh, char *args)

View File

@ -70,22 +70,25 @@ ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data
ks_assert(res_result_realm);
res_result_controllers = cJSON_GetObjectItem(res_result, "controllers");
ks_assert(res_result_controllers);
ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) response processing\n", blade_session_id_get(bs), res_result_protocol, res_result_realm);
for (int index = 0; index < cJSON_GetArraySize(res_result_controllers); ++index) {
cJSON *elem = cJSON_GetArrayItem(res_result_controllers, index);
if (elem->type == cJSON_String) {
nodeid = elem->valuestring;
if (res_result_controllers) {
for (int index = 0; index < cJSON_GetArraySize(res_result_controllers); ++index) {
cJSON *elem = cJSON_GetArrayItem(res_result_controllers, index);
if (elem->type == cJSON_String) {
nodeid = elem->valuestring;
}
}
}
blade_session_read_unlock(bs);
if (nodeid) {
if (g_testcon_nodeid) ks_pool_free(&g_testcon_nodeid);
g_testcon_nodeid = ks_pstrdup(ks_pool_get(bh), nodeid);
}
ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, g_testcon_nodeid);
return KS_FALSE;
@ -384,7 +387,8 @@ void command_subscribe(blade_handle_t *bh, char *args)
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_channel_handler, NULL);
if (args && args[0]) cJSON_AddItemToArray(channels, cJSON_CreateString(args));
blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD, "test", "mydomain.com", channels, NULL, NULL, test_channel_handler, NULL);
cJSON_Delete(channels);
}
@ -399,7 +403,8 @@ void command_unsubscribe(blade_handle_t *bh, char *args)
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
if (args && args[0]) cJSON_AddItemToArray(channels, cJSON_CreateString(args));
blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, "test", "mydomain.com", channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
cJSON_Delete(channels);
}

View File

@ -16,9 +16,13 @@ struct command_def_s {
};
void command_quit(blade_handle_t *bh, char *args);
void command_channeladd(blade_handle_t *bh, char *args);
void command_channelremove(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "quit", command_quit },
{ "channeladd", command_channeladd },
{ "channelremove", command_channelremove },
{ NULL, NULL }
};
@ -27,9 +31,12 @@ struct testproto_s {
blade_handle_t *handle;
ks_pool_t *pool;
ks_hash_t *participants;
ks_hash_t *channels;
};
typedef struct testproto_s testproto_t;
testproto_t *g_test = NULL;
static void testproto_cleanup(void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
//testproto_t *test = (testproto_t *)ptr;
@ -62,6 +69,7 @@ ks_status_t testproto_create(testproto_t **testP, blade_handle_t *bh)
test->pool = pool;
ks_hash_create(&test->participants, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
ks_hash_create(&test->channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
ks_pool_set_cleanup(test, NULL, testproto_cleanup);
@ -157,6 +165,13 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
// authorize channels with the master for the requester
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
for (ks_hash_iterator_t *it = ks_hash_first(test->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, &key, NULL, &value);
cJSON_AddItemToArray(channels, cJSON_CreateString((const char *)key));
}
blade_handle_rpcauthorize(bh, requester_nodeid, KS_FALSE, "test", "mydomain.com", channels, NULL, NULL);
@ -220,6 +235,13 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
// deauthorize channels with the master for the requester
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
for (ks_hash_iterator_t *it = ks_hash_first(test->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, &key, NULL, &value);
cJSON_AddItemToArray(channels, cJSON_CreateString((const char *)key));
}
blade_handle_rpcauthorize(bh, requester_nodeid, KS_TRUE, "test", "mydomain.com", channels, NULL, NULL);
@ -310,7 +332,6 @@ int main(int argc, char **argv)
config_setting_t *config_blade = NULL;
const char *cfgpath = "testcon.cfg";
const char *autoconnect = NULL;
testproto_t *test = NULL;
ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
@ -346,7 +367,7 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
testproto_create(&test, bh);
testproto_create(&g_test, bh);
if (autoconnect) {
blade_connection_t *bc = NULL;
@ -366,19 +387,19 @@ int main(int argc, char **argv)
// @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
ks_sleep_ms(3000);
blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)test);
blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)g_test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)test);
blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)g_test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)test);
blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)g_test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, (void *)test);
blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
cJSON_Delete(channels);
}
@ -388,7 +409,7 @@ int main(int argc, char **argv)
blade_handle_destroy(&bh);
testproto_destroy(&test);
testproto_destroy(&g_test);
config_destroy(&config);
@ -459,6 +480,51 @@ void command_quit(blade_handle_t *bh, char *args)
g_shutdown = KS_TRUE;
}
void command_channeladd(blade_handle_t *bh, char *args)
{
cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
if (!args[0]) {
ks_log(KS_LOG_INFO, "Requires channel argument");
return;
}
ks_hash_insert(g_test->channels, (void *)ks_pstrdup(g_test->pool, args), (void *)KS_TRUE);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString(args));
blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
cJSON_Delete(channels);
}
void command_channelremove(blade_handle_t *bh, char *args)
{
cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
if (!args[0]) {
ks_log(KS_LOG_INFO, "Requires channel argument");
return;
}
if (ks_hash_remove(g_test->channels, (void *)args)) {
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString(args));
blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
cJSON_Delete(channels);
}
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -151,7 +151,7 @@ KS_DECLARE(ks_status_t) ks_pool_clear(ks_pool_t *pool);
KS_DECLARE(ks_bool_t) ks_pool_verify(void *addr);
// @todo fill in documentation
KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr);
inline KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr);
/*
* void *ks_pool_alloc

View File

@ -596,31 +596,26 @@ done:
// @todo fill in documentation
KS_DECLARE(ks_bool_t) ks_pool_verify(void *addr)
{
ks_pool_prefix_t *prefix = NULL;
if (!addr) return KS_FALSE;
prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
if (check_prefix(prefix) != KS_STATUS_SUCCESS) return KS_FALSE;
if (check_prefix((ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE)) != KS_STATUS_SUCCESS) return KS_FALSE;
return KS_TRUE;
}
// @todo fill in documentation
KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr)
inline KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr)
{
ks_pool_prefix_t *prefix = NULL;
ks_assert(addr);
#ifdef DEBUG
ks_pool_prefix_t *prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
ks_status_t ret = KS_STATUS_SUCCESS;
ks_pool_t *pool = NULL;
if (!addr) goto done;
prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
if (check_prefix(prefix) != KS_STATUS_SUCCESS) goto done;
if ((ret = check_pool(prefix->pool)) == KS_STATUS_SUCCESS) pool = prefix->pool;
done:
ret = check_prefix(prefix);
ks_assert(ret == KS_STATUS_SUCCESS);
return pool;
ret = check_pool(prefix->pool);
ks_assert(ret == KS_STATUS_SUCCESS);
#endif
return ((ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE))->pool;
}
/*