diff --git a/libs/libblade/libblade.vcxproj b/libs/libblade/libblade.vcxproj index b559f7f670..1806111831 100644 --- a/libs/libblade/libblade.vcxproj +++ b/libs/libblade/libblade.vcxproj @@ -192,6 +192,7 @@ + @@ -204,6 +205,7 @@ + diff --git a/libs/libblade/libblade.vcxproj.filters b/libs/libblade/libblade.vcxproj.filters index 429764f066..17d413b017 100644 --- a/libs/libblade/libblade.vcxproj.filters +++ b/libs/libblade/libblade.vcxproj.filters @@ -45,6 +45,9 @@ Source Files + + Source Files + @@ -80,5 +83,8 @@ Header Files + + Header Files + \ No newline at end of file diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index ed7b3e45a2..f865497916 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -280,6 +280,7 @@ KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const ch return KS_STATUS_SUCCESS; } + KS_DECLARE(cJSON *) blade_session_properties_get(blade_session_t *bs) { ks_assert(bs); diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index c50f4f8e60..dbfff34eb1 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -65,6 +65,9 @@ struct blade_handle_s { ks_hash_t *protocolrpcs; // registered blade_rpc_t, for locally processing protocol messages, keyed by the rpc method + ks_hash_t *subscriptions; // registered blade_subscription_t, subscribers may include the local node + ks_hash_t *subscriptions_cleanup; // cleanup for subscriptions, keyed by the downstream subscriber nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm/event" keys to remove each nodeid from as a subscriber during cleanup + ks_hash_t *connections; // active connections keyed by connection id ks_hash_t *sessions; // active sessions keyed by session id (which comes from the nodeid of the downstream side of the session, thus an upstream session is keyed under the local_nodeid) @@ -73,14 +76,7 @@ struct blade_handle_s { // @note everything below this point is exclusively for the master node - // @todo need to track the details from blade.publish, a protocol may be published under multiple realms, and each protocol published to a realm may have multiple target providers // @todo how does "exclusive" play into the providers, does "exclusive" mean only one provider can exist for a given protocol and realm? - // for now, ignore exclusive and multiple providers, key by "protocol" in a hash, and use a blade_protocol_t to represent a protocol in the context of being published so it can be located by other nodes - // each blade_protocol_t will contain the "protocol", common method/namespace/schema data, and a hash keyed by the "realm", with a value of an object of type blade_protocol_realm_t - // each blade_protocol_realm_t will contain the "realm" and a list of publisher nodeid's, any of which can be chosen at random to use the protocol within the given realm (does "exclusive" only limit this to 1 provider per realm?) - // @todo protocols must be cleaned up when routes are removed due to session terminations, should incorporate a faster way to lookup which protocols are tied to a given nodeid for efficient removal - // create blade_protocol_method_t to represent a method that is executed with blade.execute, and is part of a protocol made available through blade.publish, registered locally by the protocol and method name (protocol.methodname?), - // with a callback handler which should also have the realm available when executed so a single provider can easily provide a protocol for multiple realms with the same method callbacks ks_hash_t *protocols; // master only: protocols that have been published with blade.publish, and the details to locate a protocol provider with blade.locate ks_hash_t *protocols_cleanup; // master only: keyed by the nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm" keys to remove each nodeid from as a provider during cleanup @@ -91,6 +87,7 @@ ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq, ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, void *data); ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data); ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data); typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t; @@ -237,6 +234,12 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP) ks_hash_create(&bh->protocolrpcs, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool); ks_assert(bh->protocolrpcs); + ks_hash_create(&bh->subscriptions, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool); + ks_assert(bh->subscriptions); + + ks_hash_create(&bh->subscriptions_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool); + ks_assert(bh->subscriptions_cleanup); + ks_hash_create(&bh->connections, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool); ks_assert(bh->connections); @@ -353,6 +356,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ blade_rpc_create(&brpc, bh, "blade.execute", NULL, NULL, blade_protocol_execute_request_handler, NULL); blade_handle_corerpc_register(brpc); + blade_rpc_create(&brpc, bh, "blade.subscribe", NULL, NULL, blade_protocol_subscribe_request_handler, NULL); + blade_handle_corerpc_register(brpc); + // register internal transport for secure websockets blade_transport_wss_create(&bt, bh); ks_assert(bt); @@ -868,6 +874,104 @@ KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, co return brpc; } +KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid) +{ + char *key = NULL; + blade_subscription_t *bsub = NULL; + ks_hash_t *bsub_cleanup = NULL; + ks_bool_t propagate = KS_FALSE; + + ks_assert(bh); + ks_assert(event); + ks_assert(protocol); + ks_assert(realm); + ks_assert(nodeid); + + key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event); + + ks_hash_write_lock(bh->subscriptions); + + bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)key, KS_UNLOCKED); + + if (!bsub) { + blade_subscription_create(&bsub, bh->pool, event, protocol, realm); + ks_assert(bsub); + + ks_hash_insert(bh->subscriptions, (void *)ks_pstrdup(bh->pool, key), bsub); + propagate = KS_TRUE; + } + + bsub_cleanup = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)nodeid, KS_UNLOCKED); + if (!bsub_cleanup) { + ks_hash_create(&bsub_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool); + ks_assert(bsub_cleanup); + + ks_log(KS_LOG_DEBUG, "Subscription (%s) added\n", key); + ks_hash_insert(bh->subscriptions_cleanup, (void *)ks_pstrdup(bh->pool, nodeid), bsub_cleanup); + } + ks_hash_insert(bsub_cleanup, (void *)ks_pstrdup(bh->pool, key), (void *)KS_TRUE); + + blade_subscription_subscribers_add(bsub, nodeid); + + ks_hash_write_unlock(bh->subscriptions); + + ks_log(KS_LOG_DEBUG, "Subscription (%s) subscriber (%s) added\n", key, nodeid); + + ks_pool_free(bh->pool, &key); + + if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_FALSE, NULL, NULL); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid) +{ + char *key = NULL; + blade_subscription_t *bsub = NULL; + ks_hash_t *bsub_cleanup = NULL; + ks_bool_t propagate = KS_FALSE; + + ks_assert(bh); + ks_assert(event); + ks_assert(protocol); + ks_assert(realm); + ks_assert(nodeid); + + key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event); + + ks_hash_write_lock(bh->subscriptions); + + bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)key, KS_UNLOCKED); + + if (bsub) { + bsub_cleanup = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)nodeid, KS_UNLOCKED); + ks_assert(bsub_cleanup); + ks_hash_remove(bsub_cleanup, key); + + if (ks_hash_count(bsub_cleanup) == 0) { + ks_hash_remove(bh->subscriptions_cleanup, (void *)nodeid); + } + + ks_log(KS_LOG_DEBUG, "Subscription (%s) subscriber (%s) removed\n", key, nodeid); + blade_subscription_subscribers_remove(bsub, nodeid); + + if (ks_hash_count(blade_subscription_subscribers_get(bsub)) == 0) { + ks_log(KS_LOG_DEBUG, "Subscription (%s) removed\n", key); + ks_hash_remove(bh->subscriptions, (void *)key); + propagate = KS_TRUE; + } + } + + ks_hash_write_unlock(bh->subscriptions); + + ks_pool_free(bh->pool, &key); + + if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL); + + return KS_STATUS_SUCCESS; +} + + KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id) { @@ -975,20 +1079,63 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs) { ks_status_t ret = KS_STATUS_SUCCESS; blade_handle_t *bh = NULL; + ks_pool_t *pool = NULL; const char *id = NULL; ks_hash_iterator_t *it = NULL; ks_bool_t upstream = KS_FALSE; + ks_bool_t unsubbed = KS_FALSE; ks_assert(bs); bh = blade_session_handle_get(bs); ks_assert(bh); + pool = blade_handle_pool_get(bh); + ks_assert(pool); + blade_session_write_lock(bs, KS_TRUE); id = blade_session_id_get(bs); ks_assert(id); + // @todo this cleanup is a bit messy, move to using the combined key rather than passing around all 3 parts would make this cleaner + while (!unsubbed) { + ks_hash_t *subscriptions = NULL; + const char *event = NULL; + const char *protocol = NULL; + const char *realm = NULL; + + ks_hash_read_lock(bh->subscriptions); + subscriptions = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)id, KS_UNLOCKED); + if (!subscriptions) unsubbed = KS_TRUE; + else { + void *key = NULL; + void *value = NULL; + blade_subscription_t *bsub = NULL; + + it = ks_hash_first(subscriptions, KS_UNLOCKED); + ks_assert(it); + + ks_hash_this(it, (const void **)&key, NULL, &value); + + bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, key, KS_UNLOCKED); + ks_assert(bsub); + + // @note allocate these to avoid lifecycle issues when the last subscriber is removed causing the subscription to be removed + event = ks_pstrdup(bh->pool, blade_subscription_event_get(bsub)); + protocol = ks_pstrdup(bh->pool, blade_subscription_protocol_get(bsub)); + realm = ks_pstrdup(bh->pool, blade_subscription_realm_get(bsub)); + } + ks_hash_read_unlock(bh->subscriptions); + + if (!unsubbed) { + blade_handle_subscriber_remove(bh, event, protocol, realm, id); + ks_pool_free(bh->pool, &event); + ks_pool_free(bh->pool, &protocol); + ks_pool_free(bh->pool, &realm); + } + } + ks_hash_write_lock(bh->sessions); if (ks_hash_remove(bh->sessions, (void *)id) == NULL) ret = KS_STATUS_FAIL; @@ -1011,8 +1158,6 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs) blade_session_write_unlock(bs); - - return ret; } @@ -1821,6 +1966,167 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr blade_session_read_unlock(bs); } + +// blade.subscribe request generator +KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + blade_session_t *bs = NULL; + + ks_assert(bh); + ks_assert(event); + ks_assert(protocol); + ks_assert(realm); + + if (!(bs = blade_handle_sessions_upstream(bh))) { + ret = KS_STATUS_DISCONNECTED; + goto done; + } + + if (remove) { + blade_handle_subscriber_remove(bh, event, protocol, realm, bh->local_nodeid); + } else { + blade_handle_subscriber_add(bh, event, protocol, realm, bh->local_nodeid); + } + +done: + if (bs) blade_session_read_unlock(bs); + + return ret; +} + +KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + blade_session_t *bs = NULL; + ks_pool_t *pool = NULL; + cJSON *req = NULL; + cJSON *req_params = NULL; + + ks_assert(bh); + ks_assert(event); + ks_assert(protocol); + ks_assert(realm); + + if (!(bs = blade_handle_sessions_upstream(bh))) { + ret = KS_STATUS_DISCONNECTED; + goto done; + } + + pool = blade_handle_pool_get(bh); + ks_assert(pool); + + blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe"); + + cJSON_AddStringToObject(req_params, "event", event); + cJSON_AddStringToObject(req_params, "protocol", protocol); + cJSON_AddStringToObject(req_params, "realm", realm); + if (remove) cJSON_AddTrueToObject(req_params, "remove"); + + ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs)); + + ret = blade_session_send(bs, req, callback, data); + +done: + if (req) cJSON_Delete(req); + if (bs) blade_session_read_unlock(bs); + + return ret; +} + +// blade.subscribe request handler +ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data) +{ + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + ks_pool_t *pool = NULL; + cJSON *req = NULL; + cJSON *req_params = NULL; + const char *req_params_event = NULL; + const char *req_params_protocol = NULL; + const char *req_params_realm = NULL; + cJSON *req_params_remove = NULL; + ks_bool_t remove = KS_FALSE; + cJSON *res = NULL; + cJSON *res_result = NULL; + + ks_assert(brpcreq); + + bh = blade_rpc_request_handle_get(brpcreq); + ks_assert(bh); + + pool = blade_handle_pool_get(bh); + ks_assert(pool); + + bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq)); + ks_assert(bs); + + req = blade_rpc_request_message_get(brpcreq); + ks_assert(req); + + req_params = cJSON_GetObjectItem(req, "params"); + if (!req_params) { + ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'params' object\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_event = cJSON_GetObjectCstr(req_params, "event"); + if (!req_params_event) { + ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'event'\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol"); + if (!req_params_protocol) { + ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_realm = cJSON_GetObjectCstr(req_params, "realm"); + if (!req_params_realm) { + ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'realm'\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_remove = cJSON_GetObjectItem(req_params, "remove"); + remove = req_params_remove && req_params_remove->type == cJSON_True; + + // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now + + ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs)); + + if (remove) { + blade_handle_subscriber_remove(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs)); + } else { + blade_handle_subscriber_add(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs)); + } + + // build the actual response finally + blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq)); + + cJSON_AddStringToObject(res_result, "event", req_params_event); + cJSON_AddStringToObject(res_result, "protocol", req_params_protocol); + cJSON_AddStringToObject(res_result, "realm", req_params_realm); + + // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup + blade_session_send(bs, res, NULL, NULL); + +done: + + if (res) cJSON_Delete(res); + if (bs) blade_session_read_unlock(bs); + + return KS_FALSE; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libblade/src/blade_subscription.c b/libs/libblade/src/blade_subscription.c new file mode 100644 index 0000000000..b17e351642 --- /dev/null +++ b/libs/libblade/src/blade_subscription.c @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2017, Shane Bryldt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "blade.h" + +struct blade_subscription_s { + ks_pool_t *pool; + + const char *event; + const char *protocol; + const char *realm; + ks_hash_t *subscribers; +}; + + +static void blade_subscription_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_subscription_t *bsub = (blade_subscription_t *)ptr; + + ks_assert(bsub); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + if (bsub->event) ks_pool_free(bsub->pool, &bsub->event); + if (bsub->protocol) ks_pool_free(bsub->pool, &bsub->protocol); + if (bsub->realm) ks_pool_free(bsub->pool, &bsub->subscribers); + if (bsub->subscribers) ks_hash_destroy(&bsub->subscribers); + break; + case KS_MPCL_DESTROY: + break; + } +} + +KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm) +{ + blade_subscription_t *bsub = NULL; + + ks_assert(bsubP); + ks_assert(pool); + ks_assert(event); + ks_assert(protocol); + ks_assert(realm); + + bsub = ks_pool_alloc(pool, sizeof(blade_subscription_t)); + bsub->pool = pool; + bsub->event = ks_pstrdup(pool, event); + bsub->protocol = ks_pstrdup(pool, protocol); + bsub->realm = ks_pstrdup(pool, realm); + + ks_hash_create(&bsub->subscribers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsub->pool); + ks_assert(bsub->subscribers); + + ks_pool_set_cleanup(pool, bsub, NULL, blade_subscription_cleanup); + + *bsubP = bsub; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP) +{ + blade_subscription_t *bsub = NULL; + + ks_assert(bsubP); + ks_assert(*bsubP); + + bsub = *bsubP; + + ks_pool_free(bsub->pool, bsubP); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub) +{ + ks_assert(bsub); + + return bsub->event; + +} + +KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub) +{ + ks_assert(bsub); + + return bsub->protocol; + +} + +KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub) +{ + ks_assert(bsub); + + return bsub->realm; + +} + +KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub) +{ + ks_assert(bsub); + + return bsub->subscribers; + +} + +KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid) +{ + char *key = NULL; + + ks_assert(bsub); + ks_assert(nodeid); + + key = ks_pstrdup(bsub->pool, nodeid); + ks_hash_insert(bsub->subscribers, (void *)key, (void *)KS_TRUE); + + return KS_STATUS_SUCCESS; + +} + +KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid) +{ + ks_assert(bsub); + ks_assert(nodeid); + + ks_hash_remove(bsub->subscribers, (void *)nodeid); + + return KS_STATUS_SUCCESS; + +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/include/blade.h b/libs/libblade/src/include/blade.h index 6e889b6808..2bc848d79c 100644 --- a/libs/libblade/src/include/blade.h +++ b/libs/libblade/src/include/blade.h @@ -45,6 +45,7 @@ #include "blade_connection.h" #include "blade_session.h" #include "blade_protocol.h" +#include "blade_subscription.h" #include "blade_transport_wss.h" diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 46c449f251..c460b8d839 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -78,6 +78,9 @@ KS_DECLARE(ks_status_t) blade_handle_protocolrpc_register(blade_rpc_t *brpc); KS_DECLARE(ks_status_t) blade_handle_protocolrpc_unregister(blade_rpc_t *brpc); KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, const char *method, const char *protocol, const char *realm); +KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid); +KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid); + KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id); @@ -106,6 +109,9 @@ KS_DECLARE(cJSON *) blade_protocol_execute_request_params_get(blade_rpc_request_ KS_DECLARE(cJSON *) blade_protocol_execute_response_result_get(blade_rpc_response_t *brpcres); KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcreq, cJSON *result); +KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data); +KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data); + KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_subscription.h b/libs/libblade/src/include/blade_subscription.h new file mode 100644 index 0000000000..c6528aa068 --- /dev/null +++ b/libs/libblade/src/include/blade_subscription.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2017, Shane Bryldt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _BLADE_SUBSCRIPTION_H_ +#define _BLADE_SUBSCRIPTION_H_ +#include + +KS_BEGIN_EXTERN_C +KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm); +KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP); +KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub); +KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub); +KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub); +KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub); +KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid); +KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid); +KS_END_EXTERN_C + +#endif + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index ed7433cd57..887d4f3e2a 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -49,8 +49,9 @@ typedef struct blade_connection_s blade_connection_t; typedef struct blade_session_s blade_session_t; typedef struct blade_session_callbacks_s blade_session_callbacks_t; typedef struct blade_protocol_s blade_protocol_t; -typedef struct blade_protocol_realm_s blade_protocol_realm_t; -typedef struct blade_protocol_method_s blade_protocol_method_t; +typedef struct blade_subscription_s blade_subscription_t; +//typedef struct blade_protocol_realm_s blade_protocol_realm_t; +//typedef struct blade_protocol_method_s blade_protocol_method_t; typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data); diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c index b3b7c9dfd8..c2e9011e37 100644 --- a/libs/libblade/test/bladec.c +++ b/libs/libblade/test/bladec.c @@ -17,10 +17,12 @@ struct command_def_s { void command_quit(blade_handle_t *bh, char *args); void command_execute(blade_handle_t *bh, char *args); +void command_subscribe(blade_handle_t *bh, char *args); static const struct command_def_s command_defs[] = { { "quit", command_quit }, { "execute", command_execute }, + { "subscribe", command_subscribe }, { NULL, NULL } }; @@ -109,6 +111,26 @@ ks_bool_t blade_locate_response_handler(blade_rpc_response_t *brpcres, void *dat return KS_FALSE; } +ks_bool_t blade_subscribe_response_handler(blade_rpc_response_t *brpcres, void *data) +{ + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + + ks_assert(brpcres); + + bh = blade_rpc_response_handle_get(brpcres); + ks_assert(bh); + + bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres)); + ks_assert(bs); + + ks_log(KS_LOG_DEBUG, "Session (%s) blade.subscribe response processing\n", blade_session_id_get(bs)); + + blade_session_read_unlock(bs); + + return KS_FALSE; +} + int main(int argc, char **argv) { blade_handle_t *bh = NULL; @@ -246,6 +268,14 @@ void command_execute(blade_handle_t *bh, char *args) blade_protocol_locate(bh, "test", "mydomain.com", blade_locate_response_handler, NULL); } +void command_subscribe(blade_handle_t *bh, char *args) +{ + ks_assert(bh); + ks_assert(args); + + blade_protocol_subscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL); +} + /* For Emacs: * Local Variables: * mode:c