diff --git a/libs/libblade/src/blade_rpc.c b/libs/libblade/src/blade_rpc.c index 9e92fbc016..cfc0e78ce5 100644 --- a/libs/libblade/src/blade_rpc.c +++ b/libs/libblade/src/blade_rpc.c @@ -50,9 +50,11 @@ struct blade_rpc_request_s { cJSON *message; const char *message_id; // pulled from message for easier keying + + ks_time_t ttl; + blade_rpc_response_callback_t callback; void *data; - // @todo ttl to wait for response before injecting an error response locally }; struct blade_rpc_response_s { @@ -253,6 +255,22 @@ KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *br return brpcreq->message_id; } +KS_DECLARE(ks_status_t) blade_rpc_request_ttl_set(blade_rpc_request_t *brpcreq, ks_time_t ttl) +{ + ks_assert(brpcreq); + + brpcreq->ttl = ks_time_now() + (ttl * KS_USEC_PER_SEC); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_bool_t) blade_rpc_request_expired(blade_rpc_request_t *brpcreq) +{ + ks_assert(brpcreq); + + return brpcreq->ttl <= ks_time_now(); +} + KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq) { ks_assert(brpcreq); diff --git a/libs/libblade/src/blade_rpcmgr.c b/libs/libblade/src/blade_rpcmgr.c index a8014388be..21b91c3335 100644 --- a/libs/libblade/src/blade_rpcmgr.c +++ b/libs/libblade/src/blade_rpcmgr.c @@ -39,7 +39,9 @@ struct blade_rpcmgr_s { ks_hash_t *corerpcs; // method, blade_rpc_t* ks_hash_t *protocolrpcs; // method, blade_rpc_t* - ks_hash_t *requests; // id, KS_TRUE + ks_hash_t *requests; // id, blade_rpc_request_t* + ks_time_t requests_ttlcheck; + ks_mutex_t* requests_ttlcheck_lock; }; @@ -100,6 +102,9 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_create(blade_rpcmgr_t **brpcmgrP, blade_han ks_hash_create(&brpcmgr->requests, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool); ks_assert(brpcmgr->requests); + ks_mutex_create(&brpcmgr->requests_ttlcheck_lock, KS_MUTEX_FLAG_NON_RECURSIVE, pool); + ks_assert(brpcmgr->requests_ttlcheck_lock); + ks_pool_set_cleanup(brpcmgr, NULL, blade_rpcmgr_cleanup); *brpcmgrP = brpcmgr; @@ -218,7 +223,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_add(blade_rpcmgr_t *brpcmgr, bl ks_log(KS_LOG_DEBUG, "ProtocolRPC Added: %s\n", key); - return KS_STATUS_SUCCESS; +return KS_STATUS_SUCCESS; } @@ -294,6 +299,59 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, bla return KS_STATUS_SUCCESS; } +KS_DECLARE(ks_status_t) blade_rpcmgr_request_timeouts(blade_rpcmgr_t *brpcmgr) +{ + blade_session_t *loopback = NULL; + + ks_assert(brpcmgr); + + // All this stuff is to ensure that the requests hash is not locked up when it does not need to be, + // and this will also ensure that sessions will not lock up if any other session is trying to deal + // with timeouts already + if (ks_mutex_trylock(brpcmgr->requests_ttlcheck_lock) != KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS; + + if (brpcmgr->requests_ttlcheck > ks_time_now()) { + ks_mutex_unlock(brpcmgr->requests_ttlcheck_lock); + return KS_STATUS_SUCCESS; + } + + ks_hash_write_lock(brpcmgr->requests); + + // Give a one second delay between timeout checking + brpcmgr->requests_ttlcheck = ks_time_now() + KS_USEC_PER_SEC; + + ks_mutex_unlock(brpcmgr->requests_ttlcheck_lock); + + // Now find all the expired requests and send out loopback error responses, which will invoke request removal when received and processed + for (ks_hash_iterator_t *it = ks_hash_first(brpcmgr->requests, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const char *key = NULL; + blade_rpc_request_t *value = NULL; + cJSON *res = NULL; + + ks_hash_this(it, (const void **)&key, NULL, (void **)&value); + + if (!blade_rpc_request_expired(value)) continue; + + if (!loopback) loopback = blade_sessionmgr_loopback_lookup(blade_handle_sessionmgr_get(brpcmgr->handle)); + + ks_log(KS_LOG_DEBUG, "Request (%s) TTL timeout\n", key); + + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(value), -32000, "Request timed out"); + // @todo may want to include requester-nodeid and responder-nodeid into the error block when they are present in the original request + // even though a response or error without them is treated as locally processed, it may be useful to know who the request was attempted with + // when multiple options are available such as a blade.execute where the protocol has multiple possible controllers to pick from + blade_session_send(loopback, res, 0, NULL, NULL); + + cJSON_Delete(res); + } + + if (loopback) blade_session_read_unlock(loopback); + + ks_hash_write_unlock(brpcmgr->requests); + + return KS_STATUS_SUCCESS; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index d449ff725b..d3dd6be117 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -412,7 +412,8 @@ KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *j ks_assert(bs); ks_assert(json); - if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK) ret = blade_session_receiving_push(bs, json); + if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK) + ret = blade_session_receiving_push(bs, json); else { json_copy = cJSON_Duplicate(json, 1); if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond); @@ -477,8 +478,6 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) // we can start stuffing any messages queued for output on the session straight to the connection right away, may need to only // do this when in session ready state but there may be implications of other states sending messages through the session while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) { - // @todo short-circuit with blade_session_receiving_push on the same session if the message has responder-nodeid == requester-nodeid == local_nodeid - // which would allow a system to send messages to itself, such as calling a protocolrpc immediately without bouncing upstream first blade_connection_sending_push(bc, json); cJSON_Delete(json); } @@ -561,11 +560,13 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs) cJSON_Delete(json); } + blade_rpcmgr_request_timeouts(blade_handle_rpcmgr_get(blade_session_handle_get(bs))); + return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data) +KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data) { blade_rpc_request_t *brpcreq = NULL; const char *method = NULL; @@ -580,22 +581,19 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, bla ks_assert(id); if (method) { - // @note This is scenario 1 - // 1) Sending a request (client: method caller or consumer) ks_log(KS_LOG_DEBUG, "Session (%s) sending request (%s) for %s\n", bs->id, id, method); blade_rpc_request_create(&brpcreq, bs->handle, ks_pool_get(bs->handle), bs->id, json, callback, data); ks_assert(brpcreq); - // @todo set request TTL and figure out when requests are checked for expiration (separate thread in the handle?) + if (ttl <= 0) ttl = 10; + blade_rpc_request_ttl_set(brpcreq, ttl); + blade_rpcmgr_request_add(blade_handle_rpcmgr_get(bs->handle), brpcreq); } else { - // @note This is scenario 3 - // 3) Sending a response or error (server: method callee or provider) ks_log(KS_LOG_DEBUG, "Session (%s) sending response (%s)\n", bs->id, id); } - //blade_session_sending_push(bs, json); if (!bs->connection) { blade_session_sending_push(bs, json); } else { @@ -653,13 +651,15 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) blade_rpc_t *brpc = NULL; blade_rpc_request_callback_t callback = NULL; cJSON *params = NULL; + const char *params_requester_nodeid = NULL; + const char *params_responder_nodeid = NULL; ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method); params = cJSON_GetObjectItem(json, "params"); if (params) { - const char *params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid"); - const char *params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid"); + params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid"); + params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid"); if (params_requester_nodeid && params_responder_nodeid && !blade_routemgr_local_check(blade_handle_routemgr_get(bh), params_responder_nodeid)) { // not meant for local processing, continue with standard unicast routing for requests blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), params_responder_nodeid); @@ -677,7 +677,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid); cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); return KS_STATUS_DISCONNECTED; } } @@ -687,7 +687,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) } ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) routing (%s)\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid, blade_session_id_get(bs_router)); - blade_session_send(bs_router, json, NULL, NULL); + blade_session_send(bs_router, json, 0, NULL, NULL); blade_session_read_unlock(bs_router); // @todo if this is a subscribe request to remove subscriptions, it must carry a field unsubscribed-channels for which @@ -704,8 +704,18 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) brpc = blade_rpcmgr_corerpc_lookup(blade_handle_rpcmgr_get(bs->handle), method); if (!brpc) { + cJSON *res = NULL; + cJSON *res_error = NULL; + ks_log(KS_LOG_DEBUG, "Received unknown rpc method %s\n", method); - // @todo send error response, code = -32601 (method not found) + blade_rpc_error_raw_create(&res, &res_error, id, -32601, "RPC method not found"); + + // needed in case this error must propagate further than the session which sent it + if (params_requester_nodeid) cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid); + if (params_responder_nodeid) cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); + + blade_session_send(bs, res, 0, NULL, NULL); + return KS_STATUS_FAIL; } callback = blade_rpc_callback_get(brpc); @@ -750,7 +760,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) } ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) routing (%s)\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid, blade_session_id_get(bs_router)); - blade_session_send(bs_router, json, NULL, NULL); + blade_session_send(bs_router, json, 0, NULL, NULL); blade_session_read_unlock(bs_router); // @todo if this is a subscribe response to add a subscriber with the master as the responder-nodeid, it must have a diff --git a/libs/libblade/src/blade_sessionmgr.c b/libs/libblade/src/blade_sessionmgr.c index c4b2eefadb..6faa01e5eb 100644 --- a/libs/libblade/src/blade_sessionmgr.c +++ b/libs/libblade/src/blade_sessionmgr.c @@ -149,6 +149,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, conf ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bsmgr->loopback)); + blade_sessionmgr_session_add(bsmgr, bsmgr->loopback); + + blade_session_state_set(bsmgr->loopback, BLADE_SESSION_STATE_STARTUP); + return KS_STATUS_SUCCESS; } @@ -158,10 +162,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr) ks_assert(bsmgr); - if (bsmgr->loopback) { - blade_session_hangup(bsmgr->loopback); - ks_sleep_ms(100); - } + //if (bsmgr->loopback) { + // blade_session_hangup(bsmgr->loopback); + // ks_sleep_ms(100); + //} ks_hash_read_lock(bsmgr->sessions); for (it = ks_hash_first(bsmgr->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) { diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 70e1943deb..334e24a3af 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -380,7 +380,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcroute(blade_handle_t *bh, const char *no ks_log(KS_LOG_DEBUG, "Session (%s) route request (%s %s) started\n", blade_session_id_get(bs), remove ? "removing" : "adding", nodeid); - ret = blade_session_send(bs, req, callback, data); + ret = blade_session_send(bs, req, 0, callback, data); done: if (req) cJSON_Delete(req); @@ -417,7 +417,7 @@ ks_bool_t blade_rpcroute_request_handler(blade_rpc_request_t *brpcreq, void *dat if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) route request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -425,7 +425,7 @@ ks_bool_t blade_rpcroute_request_handler(blade_rpc_request_t *brpcreq, void *dat if (!req_params_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) route request missing 'nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } req_params_remove = cJSON_GetObjectItem(req_params, "remove"); @@ -442,7 +442,7 @@ ks_bool_t blade_rpcroute_request_handler(blade_rpc_request_t *brpcreq, void *dat } blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq)); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); done: @@ -530,7 +530,7 @@ ks_status_t blade_handle_rpcregister_raw(blade_handle_t *bh, const char *identit cJSON_AddStringToObject(req_params, "identity", identity); cJSON_AddStringToObject(req_params, "nodeid", nodeid); - ret = blade_session_send(bs, req, callback, data); + ret = blade_session_send(bs, req, 0, callback, data); done: if (req) cJSON_Delete(req); @@ -571,7 +571,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void * if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -579,7 +579,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void * if (!req_params_identity) { ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'identity'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params identity"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -587,7 +587,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void * if (blade_identity_parse(identity, req_params_identity) != KS_STATUS_SUCCESS) { ks_log(KS_LOG_DEBUG, "Session (%s) register request invalid 'identity'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params identity"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -595,7 +595,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void * if (!req_params_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -612,7 +612,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void * cJSON_AddStringToObject(res_result, "nodeid", req_params_nodeid); // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); } else { blade_rpcregister_data_t *temp_data = (blade_rpcregister_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcregister_data_t)); temp_data->original_requestid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq)); @@ -696,7 +696,7 @@ ks_bool_t blade_rpcregister_response_handler(blade_rpc_response_t *brpcres, void cJSON_AddStringToObject(res_result, "identity", res_result_identity); cJSON_AddStringToObject(res_result, "nodeid", res_result_nodeid); - blade_session_send(relay, res, NULL, NULL); + blade_session_send(relay, res, 0, NULL, NULL); cJSON_Delete(res); @@ -765,7 +765,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpub ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs)); - ret = blade_session_send(bs, req, callback, data); + ret = blade_session_send(bs, req, 0, callback, data); done: if (req) cJSON_Delete(req); @@ -804,11 +804,14 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d req = blade_rpc_request_message_get(brpcreq); ks_assert(req); + // @todo error messages in here SHOULD include requester-nodeid and responder-nodeid in the error responses to ensure + // proper return routing of the error message + req_params = cJSON_GetObjectItem(req, "params"); if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -816,7 +819,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_command) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'command'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } command = (blade_rpcpublish_command_t)req_params_command->valueint; @@ -832,7 +835,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_protocol) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'protocol'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -840,7 +843,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_requester_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'requester-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -848,14 +851,14 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_responder_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'responder-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } if (!blade_routemgr_master_check(bh->routemgr, req_params_responder_nodeid)) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -867,7 +870,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (req_params_channels->type != cJSON_Array) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -875,7 +878,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (req_params_channels_element->type != cJSON_Object) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -883,7 +886,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_channels_element_name) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'channels' element name\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels element name"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -891,7 +894,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d if (req_params_channels_element_flags && req_params_channels_element_flags->type != cJSON_Number) { ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element flags type, expected number\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels element flags type"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } } @@ -946,7 +949,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d // @todo include a list of channels that failed to be added or removed if applicable? // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); done: @@ -996,7 +999,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char ks_log(KS_LOG_DEBUG, "Session (%s) authorize request started\n", blade_session_id_get(bs)); - ret = blade_session_send(bs, req, callback, data); + ret = blade_session_send(bs, req, 0, callback, data); done: if (req) cJSON_Delete(req); @@ -1037,11 +1040,14 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void req = blade_rpc_request_message_get(brpcreq); ks_assert(req); + // @todo error messages in here need to include requester-nodeid and responder-nodeid in the error responses to ensure + // proper return routing of the error message + req_params = cJSON_GetObjectItem(req, "params"); if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1049,7 +1055,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_protocol) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'protocol'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1060,7 +1066,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_authorized_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'authorized-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params authorized-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1068,7 +1074,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_requester_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'requester-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1076,7 +1082,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_responder_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'responder-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1084,14 +1090,14 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_channels) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'channels'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } if (req_params_channels->type != cJSON_Array) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' type, expected array\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1099,7 +1105,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (channel->type != cJSON_String) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' element type, expected string\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } } @@ -1107,7 +1113,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (!blade_routemgr_master_check(bh->routemgr, req_params_responder_nodeid)) { ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1140,7 +1146,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels); // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); if (res_result_unauthorized_channels) { blade_handle_rpcsubscribe_raw(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, req_params_protocol, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL); @@ -1188,7 +1194,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *p ks_log(KS_LOG_DEBUG, "Session (%s) locate request started\n", blade_session_id_get(bs)); - ret = blade_session_send(bs, req, callback, data); + ret = blade_session_send(bs, req, 0, callback, data); done: if (req) cJSON_Delete(req); @@ -1223,11 +1229,14 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da req = blade_rpc_request_message_get(brpcreq); ks_assert(req); + // @todo error messages in here need to include requester-nodeid and responder-nodeid in the error responses to ensure + // proper return routing of the error message + req_params = cJSON_GetObjectItem(req, "params"); if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1235,7 +1244,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da if (!req_params_protocol) { ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'protocol'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1243,7 +1252,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da if (!req_params_requester_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'requester-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1251,14 +1260,14 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da if (!req_params_responder_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'responder-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } if (!blade_routemgr_master_check(bh->routemgr, req_params_responder_nodeid)) { ks_log(KS_LOG_DEBUG, "Session (%s) locate request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1280,7 +1289,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da if (res_result_controllers) cJSON_AddItemToObject(res_result, "controllers", res_result_controllers); // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); done: @@ -1292,7 +1301,7 @@ done: // blade.execute request generator -KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, blade_rpc_response_callback_t callback, void *data) +KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data) { ks_status_t ret = KS_STATUS_SUCCESS; blade_session_t *bs = NULL; @@ -1324,7 +1333,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char * ks_log(KS_LOG_DEBUG, "Session (%s) execute request started\n", blade_session_id_get(bs)); - ret = blade_session_send(bs, req, callback, data); + ret = blade_session_send(bs, req, 0, callback, data); done: if (req) cJSON_Delete(req); @@ -1360,11 +1369,14 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d req = blade_rpc_request_message_get(brpcreq); ks_assert(req); + // @todo error messages in here need to include requester-nodeid and responder-nodeid in the error responses to ensure + // proper return routing of the error message + req_params = cJSON_GetObjectItem(req, "params"); if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1372,7 +1384,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_method) { ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'method'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params method"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1380,7 +1392,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_protocol) { ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'protocol'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1388,7 +1400,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_requester_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'requester-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1396,7 +1408,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d if (!req_params_responder_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'responder-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1408,7 +1420,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d if (!brpc) { ks_log(KS_LOG_DEBUG, "Session (%s) execute request unknown method\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Unknown params method"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1539,7 +1551,7 @@ KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJ if (result) cJSON_AddItemToObject(res_result, "result", cJSON_Duplicate(result, 1)); // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); cJSON_Delete(res); @@ -1667,7 +1679,7 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs)); - ret = blade_session_send(bs, req, blade_rpcsubscribe_response_handler, data); + ret = blade_session_send(bs, req, 0, blade_rpcsubscribe_response_handler, data); done: if (req) cJSON_Delete(req); @@ -1715,7 +1727,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1723,7 +1735,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_command) { ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'command'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } command = (blade_rpcsubscribe_command_t)req_params_command->valueint; @@ -1737,7 +1749,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_protocol) { ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1745,7 +1757,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_subscriber_nodeid) { ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscriber-nodeid'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscriber-nodeid"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1759,7 +1771,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_channels) { ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'channels'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1805,7 +1817,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void // could be pulled from the original request associated to the response // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); } else { blade_rpcsubscribe_data_t *temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t)); temp_data->original_requestid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq)); @@ -1914,7 +1926,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, voi if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", cJSON_Duplicate(res_result_subscribe_channels, 1)); if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", cJSON_Duplicate(res_result_failed_channels, 1)); - blade_session_send(relay, res, NULL, NULL); + blade_session_send(relay, res, 0, NULL, NULL); cJSON_Delete(res); @@ -1973,7 +1985,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params) { ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'params' object\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1981,7 +1993,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_protocol) { ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } @@ -1989,7 +2001,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_command) { ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'command'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } command = (blade_rpcbroadcast_command_t)req_params_command->valueint; @@ -1999,7 +2011,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_event) { ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE: @@ -2007,7 +2019,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void if (!req_params_channel) { ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs)); blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel"); - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); goto done; } break; @@ -2035,7 +2047,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void if (req_params_event) cJSON_AddStringToObject(res_result, "event", req_params_event); // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup - blade_session_send(bs, res, NULL, NULL); + blade_session_send(bs, res, 0, NULL, NULL); done: diff --git a/libs/libblade/src/blade_subscriptionmgr.c b/libs/libblade/src/blade_subscriptionmgr.c index 59cabdbf27..be25dae5b4 100644 --- a/libs/libblade/src/blade_subscriptionmgr.c +++ b/libs/libblade/src/blade_subscriptionmgr.c @@ -483,7 +483,7 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t ks_log(KS_LOG_DEBUG, "Broadcasting: protocol %s, channel %s through %s\n", protocol, channel, blade_session_id_get(bs)); - blade_session_send(bs, req, callback, data); + blade_session_send(bs, req, 0, callback, data); cJSON_Delete(req); diff --git a/libs/libblade/src/include/blade_rpc.h b/libs/libblade/src/include/blade_rpc.h index 69e83344b2..a381794368 100644 --- a/libs/libblade/src/include/blade_rpc.h +++ b/libs/libblade/src/include/blade_rpc.h @@ -57,6 +57,8 @@ KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *b KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq); +KS_DECLARE(ks_status_t) blade_rpc_request_ttl_set(blade_rpc_request_t *brpcreq, ks_time_t ttl); +KS_DECLARE(ks_bool_t) blade_rpc_request_expired(blade_rpc_request_t *brpcreq); KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq); KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq); diff --git a/libs/libblade/src/include/blade_rpcmgr.h b/libs/libblade/src/include/blade_rpcmgr.h index 2d93189812..646815d389 100644 --- a/libs/libblade/src/include/blade_rpcmgr.h +++ b/libs/libblade/src/include/blade_rpcmgr.h @@ -48,6 +48,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_remove(blade_rpcmgr_t *brpcmgr, KS_DECLARE(blade_rpc_request_t *) blade_rpcmgr_request_lookup(blade_rpcmgr_t *brpcmgr, const char *id); KS_DECLARE(ks_status_t) blade_rpcmgr_request_add(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq); KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq); +KS_DECLARE(ks_status_t) blade_rpcmgr_request_timeouts(blade_rpcmgr_t *brpcmgr); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_session.h b/libs/libblade/src/include/blade_session.h index fb7a2c64ce..0a292fc08b 100644 --- a/libs/libblade/src/include/blade_session.h +++ b/libs/libblade/src/include/blade_session.h @@ -61,7 +61,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs); KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs); KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id); -KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data); +KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json); KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json); diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 01d6e04eeb..3a5e4edbf6 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -68,7 +68,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, blade_rpc_response_callback_t callback, void *data); -KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, blade_rpc_response_callback_t callback, void *data); +KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq); diff --git a/libs/libblade/test/testcli.c b/libs/libblade/test/testcli.c index d904c448af..ad06ee1963 100644 --- a/libs/libblade/test/testcli.c +++ b/libs/libblade/test/testcli.c @@ -368,7 +368,7 @@ void command_join(blade_handle_t *bh, char *args) } params = cJSON_CreateObject(); - blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", params, test_join_response_handler, NULL); + blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", params, 0, test_join_response_handler, NULL); cJSON_Delete(params); } @@ -417,7 +417,7 @@ void command_leave(blade_handle_t *bh, char *args) } params = cJSON_CreateObject(); - blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", params, test_leave_response_handler, NULL); + blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", params, 0, test_leave_response_handler, NULL); cJSON_Delete(params); } @@ -440,7 +440,7 @@ void command_talk(blade_handle_t *bh, char *args) params = cJSON_CreateObject(); cJSON_AddStringToObject(params, "text", args); //blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.talk", "test", params, test_talk_response_handler, NULL); - blade_handle_rpcexecute(bh, "blade:testcon@freeswitch.com", "test.talk", "test", params, test_talk_response_handler, NULL); + blade_handle_rpcexecute(bh, "blade:testcon@freeswitch.com", "test.talk", "test", params, 0, test_talk_response_handler, NULL); cJSON_Delete(params); } diff --git a/libs/libblade/test/testcon.c b/libs/libblade/test/testcon.c index 4dffa70330..a12355fa49 100644 --- a/libs/libblade/test/testcon.c +++ b/libs/libblade/test/testcon.c @@ -180,7 +180,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data) // send rpcexecute response to the requester result = cJSON_CreateObject(); - blade_rpcexecute_response_send(brpcreq, result); + //blade_rpcexecute_response_send(brpcreq, result); cJSON_Delete(result);