diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 973f3a416f..278a16e5b4 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -88,6 +88,7 @@ ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, v ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data); ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data); ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data); typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t; @@ -359,6 +360,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ blade_rpc_create(&brpc, bh, "blade.subscribe", NULL, NULL, blade_protocol_subscribe_request_handler, NULL); blade_handle_corerpc_register(brpc); + blade_rpc_create(&brpc, bh, "blade.broadcast", NULL, NULL, blade_protocol_broadcast_request_handler, NULL); + blade_handle_corerpc_register(brpc); + // register internal transport for secure websockets blade_transport_wss_create(&bt, bh); ks_assert(bt); @@ -2141,6 +2145,232 @@ done: return KS_FALSE; } + +// blade.broadcast request generator +KS_DECLARE(ks_status_t) blade_protocol_broadcast(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(bh); + ks_assert(event); + ks_assert(protocol); + ks_assert(realm); + + // this will ensure any downstream subscriber sessions, and upstream session if available will be broadcasted to + ret = blade_protocol_broadcast_raw(bh, NULL, event, protocol, realm, params, callback, data); + + // @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations + // if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler + // is where this normally occurs + + return ret; +} + +KS_DECLARE(ks_status_t) blade_protocol_broadcast_raw(blade_handle_t *bh, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data) +{ + const char *bsub_key = NULL; + blade_subscription_t *bsub = NULL; + blade_session_t *bs = NULL; + + ks_assert(bh); + ks_assert(event); + ks_assert(protocol); + ks_assert(realm); + + bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event); + + ks_hash_read_lock(bh->subscriptions); + + bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED); + if (bsub) { + ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub); + + ks_assert(subscribers); + + for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + void *key = NULL; + void *value = NULL; + cJSON *req = NULL; + cJSON *req_params = NULL; + + ks_hash_this(it, &key, NULL, &value); + + if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue; + + if (blade_handle_local_nodeid_compare(bh, (const char *)key)) continue; + + bs = blade_handle_sessions_lookup(bh, (const char *)key); + if (bs) { + ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs)); + + blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast"); + + cJSON_AddStringToObject(req_params, "event", event); + cJSON_AddStringToObject(req_params, "protocol", protocol); + cJSON_AddStringToObject(req_params, "realm", realm); + + if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1)); + + blade_session_send(bs, req, callback, data); + + cJSON_Delete(req); + + blade_session_read_unlock(bs); + } + } + } + + ks_hash_read_unlock(bh->subscriptions); + + ks_pool_free(bh->pool, &bsub_key); + + bs = blade_handle_sessions_upstream(bh); + if (bs) { + if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) { + cJSON *req = NULL; + cJSON *req_params = NULL; + + ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs)); + + blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast"); + + cJSON_AddStringToObject(req_params, "event", event); + cJSON_AddStringToObject(req_params, "protocol", protocol); + cJSON_AddStringToObject(req_params, "realm", realm); + + if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1)); + + blade_session_send(bs, req, callback, data); + + cJSON_Delete(req); + } + + blade_session_read_unlock(bs); + } + return KS_STATUS_SUCCESS; +} + +// blade.broadcast request handler +ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data) +{ + ks_bool_t ret = KS_FALSE; + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + cJSON *req = NULL; + cJSON *req_params = NULL; + const char *req_params_event = NULL; + const char *req_params_protocol = NULL; + const char *req_params_realm = NULL; + cJSON *req_params_params = NULL; + const char *bsub_key = NULL; + blade_subscription_t *bsub = NULL; + blade_rpc_request_callback_t callback = NULL; + cJSON *res = NULL; + cJSON *res_result = NULL; + + ks_assert(brpcreq); + + bh = blade_rpc_request_handle_get(brpcreq); + ks_assert(bh); + + bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq)); + ks_assert(bs); + + req = blade_rpc_request_message_get(brpcreq); + ks_assert(req); + + req_params = cJSON_GetObjectItem(req, "params"); + if (!req_params) { + ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'params' object\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_event = cJSON_GetObjectCstr(req_params, "event"); + if (!req_params_event) { + ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol"); + if (!req_params_protocol) { + ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_realm = cJSON_GetObjectCstr(req_params, "realm"); + if (!req_params_realm) { + ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'realm'\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_params = cJSON_GetObjectCstr(req_params, "params"); + + + blade_protocol_broadcast_raw(bh, blade_session_id_get(bs), req_params_event, req_params_protocol, req_params_realm, req_params_params, NULL, NULL); + + + bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", req_params_protocol, req_params_realm, req_params_event); + + ks_hash_read_lock(bh->subscriptions); + + bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED); + if (bsub) { + ks_rwl_read_lock(bh->local_nodeid_rwl); + if (ks_hash_search(blade_subscription_subscribers_get(bsub), bh->local_nodeid, KS_UNLOCKED)) { + callback = blade_subscription_callback_get(bsub); + if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub)); + } + ks_rwl_read_unlock(bh->local_nodeid_rwl); + } + + ks_hash_read_unlock(bh->subscriptions); + + ks_pool_free(bh->pool, &bsub_key); + + // build the actual response finally + blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq)); + + cJSON_AddStringToObject(res_result, "event", req_params_event); + cJSON_AddStringToObject(res_result, "protocol", req_params_protocol); + cJSON_AddStringToObject(res_result, "realm", req_params_realm); + + // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup + blade_session_send(bs, res, NULL, NULL); + + +done: + + if (res) cJSON_Delete(res); + if (bs) blade_session_read_unlock(bs); + + return ret; +} + +KS_DECLARE(cJSON *) blade_protocol_broadcast_request_params_get(blade_rpc_request_t *brpcreq) +{ + cJSON *req = NULL; + cJSON *req_params = NULL; + cJSON *req_params_params = NULL; + + ks_assert(brpcreq); + + req = blade_rpc_request_message_get(brpcreq); + ks_assert(req); + + req_params = cJSON_GetObjectItem(req, "params"); + if (req_params) req_params_params = cJSON_GetObjectItem(req_params, "params"); + + return req_params_params; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 4f9e69467b..9ab945612e 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -112,6 +112,10 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data); KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data); +KS_DECLARE(ks_status_t) blade_protocol_broadcast(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data); +KS_DECLARE(ks_status_t) blade_protocol_broadcast_raw(blade_handle_t *bh, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data); +KS_DECLARE(cJSON *) blade_protocol_broadcast_request_params_get(blade_rpc_request_t *brpcreq); + KS_END_EXTERN_C #endif diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c index 1d38aa3d53..2858dc0946 100644 --- a/libs/libblade/test/bladec.c +++ b/libs/libblade/test/bladec.c @@ -202,6 +202,8 @@ int main(int argc, char **argv) blade_identity_destroy(&target); ks_sleep_ms(5000); + + } loop(bh); diff --git a/libs/libblade/test/blades.c b/libs/libblade/test/blades.c index 4209528640..15a323f23c 100644 --- a/libs/libblade/test/blades.c +++ b/libs/libblade/test/blades.c @@ -16,9 +16,11 @@ struct command_def_s { }; void command_quit(blade_handle_t *bh, char *args); +void command_broadcast(blade_handle_t *bh, char *args); static const struct command_def_s command_defs[] = { { "quit", command_quit }, + { "broadcast", command_broadcast }, { NULL, NULL } }; @@ -81,6 +83,25 @@ ks_bool_t test_echo_request_handler(blade_rpc_request_t *brpcreq, void *data) return KS_FALSE; } +ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data) +{ + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + + ks_assert(brpcres); + + bh = blade_rpc_response_handle_get(brpcres); + ks_assert(bh); + + bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres)); + ks_assert(bs); + + ks_log(KS_LOG_DEBUG, "Session (%s) test.event response processing\n", blade_session_id_get(bs)); + + blade_session_read_unlock(bs); + + return KS_FALSE; +} int main(int argc, char **argv) { @@ -216,6 +237,14 @@ void command_quit(blade_handle_t *bh, char *args) g_shutdown = KS_TRUE; } +void command_broadcast(blade_handle_t *bh, char *args) +{ + ks_assert(bh); + ks_assert(args); + + blade_protocol_broadcast(bh, "test.event", "test", "mydomain.com", NULL, test_event_response_handler, NULL); +} + /* For Emacs: * Local Variables: