FS-9952: Initial rpc application implementation

This commit is contained in:
colm 2017-02-08 15:52:36 -05:00 committed by Mike Jerris
parent 1761e35bf1
commit a9945a682b
2 changed files with 109 additions and 60 deletions

View File

@ -73,11 +73,11 @@ blade_rpc_handle_ex_t *g_handle = NULL;
typedef struct blade_rpc_custom_callbackpair_s typedef struct blade_rpc_custom_callbackpair_s
{ {
jrpc_prefix_func_t prefix_request_func; jrpc_func_t prefix_request_func;
jrpc_postfix_func_t postfix_request_func; jrpc_func_t postfix_request_func;
jrpc_prefix_resp_func_t prefix_response_func; jrpc_func_t prefix_response_func;
jrpc_postfix_resp_func_t postfix_response_func; jrpc_func_t postfix_response_func;
} blade_rpc_custom_callbackpair_t; } blade_rpc_custom_callbackpair_t;
@ -87,7 +87,7 @@ typedef struct blade_rpc_callbackpair_s
{ {
jrpc_func_t request_func; jrpc_func_t request_func;
jrpc_resp_func_t response_func; jrpc_func_t response_func;
blade_rpc_custom_callbackpair_t* custom; blade_rpc_custom_callbackpair_t* custom;
@ -270,7 +270,7 @@ KS_DECLARE(jrpc_func_t) blade_rpc_find_request_function(char *fqcommand)
return f; return f;
} }
KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_requestprefix_function(char *fqcommand) KS_DECLARE(jrpc_func_t) blade_rpc_find_requestprefix_function(char *fqcommand)
{ {
blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand); blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand);
@ -279,14 +279,14 @@ KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_requestprefix_function(char *fqcomma
return NULL; return NULL;
} }
jrpc_resp_func_t f = callbacks->custom->prefix_request_func; jrpc_func_t f = callbacks->custom->prefix_request_func;
ks_mutex_unlock(callbacks->lock); ks_mutex_unlock(callbacks->lock);
return f; return f;
} }
KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_response_function(char *fqcommand) KS_DECLARE(jrpc_func_t) blade_rpc_find_response_function(char *fqcommand)
{ {
blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand); blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand);
@ -295,14 +295,14 @@ KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_response_function(char *fqcommand)
return NULL; return NULL;
} }
jrpc_resp_func_t f = callbacks->response_func; jrpc_func_t f = callbacks->response_func;
ks_mutex_unlock(callbacks->lock); ks_mutex_unlock(callbacks->lock);
return f; return f;
} }
KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_responseprefix_function(char *fqcommand) KS_DECLARE(jrpc_func_t) blade_rpc_find_responseprefix_function(char *fqcommand)
{ {
blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand); blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand);
@ -311,7 +311,7 @@ KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_responseprefix_function(char *fqcomm
return NULL; return NULL;
} }
jrpc_resp_func_t f = callbacks->custom->prefix_response_func; jrpc_func_t f = callbacks->custom->prefix_response_func;
ks_mutex_unlock(callbacks->lock); ks_mutex_unlock(callbacks->lock);
@ -342,7 +342,7 @@ KS_DECLARE(ks_status_t) blade_rpc_declare_namespace(char* namespace, const char*
KS_DECLARE(ks_status_t)blade_rpc_register_namespace_function(char *namespace, KS_DECLARE(ks_status_t)blade_rpc_register_namespace_function(char *namespace,
char *command, char *command,
jrpc_func_t func, jrpc_func_t func,
jrpc_resp_func_t respfunc) jrpc_func_t respfunc)
{ {
if (!func && !respfunc) { if (!func && !respfunc) {
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
@ -411,8 +411,8 @@ KS_DECLARE(ks_status_t)blade_rpc_register_namespace_function(char *namespace,
KS_DECLARE(ks_status_t)blade_rpc_register_prefix_request_function(char *namespace, KS_DECLARE(ks_status_t)blade_rpc_register_prefix_request_function(char *namespace,
char *command, char *command,
jrpc_prefix_func_t prefix_func, jrpc_func_t prefix_func,
jrpc_postfix_func_t postfix_func) jrpc_func_t postfix_func)
{ {
ks_status_t s = KS_STATUS_FAIL; ks_status_t s = KS_STATUS_FAIL;
@ -438,8 +438,8 @@ KS_DECLARE(ks_status_t)blade_rpc_register_prefix_request_function(char *namespac
KS_DECLARE(ks_status_t)blade_rpc_register_prefix_response_function(char* namespace, KS_DECLARE(ks_status_t)blade_rpc_register_prefix_response_function(char* namespace,
char *command, char *command,
jrpc_prefix_resp_func_t prefix_func, jrpc_func_t prefix_func,
jrpc_postfix_resp_func_t postfix_func) jrpc_func_t postfix_func)
{ {
ks_status_t s = KS_STATUS_FAIL; ks_status_t s = KS_STATUS_FAIL;
@ -524,7 +524,7 @@ KS_DECLARE(ks_status_t) blade_rpc_declare_template(char* templatename, const cha
KS_DECLARE(ks_status_t)blade_rpc_register_template_function(char *name, KS_DECLARE(ks_status_t)blade_rpc_register_template_function(char *name,
char *command, char *command,
jrpc_func_t func, jrpc_func_t func,
jrpc_resp_func_t respfunc) jrpc_func_t respfunc)
{ {
if (!func && !respfunc) { if (!func && !respfunc) {
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
@ -750,17 +750,18 @@ KS_DECLARE(ks_status_t) blade_rpc_write_json(cJSON* json)
/* /*
* rpc message processing * rpc message processing
*/ */
static ks_status_t blade_rpc_process_jsonmessage_all(cJSON *request, cJSON **responseP) static ks_status_t blade_rpc_process_jsonmessage_all(cJSON *request)
{ {
const char *fqcommand = cJSON_GetObjectCstr(request, "method"); const char *fqcommand = cJSON_GetObjectCstr(request, "method");
cJSON *error = NULL; cJSON *error = NULL;
cJSON *response = NULL; cJSON *response = NULL;
*responseP = NULL; cJSON *responseP = NULL;
if (!fqcommand) { if (!fqcommand) {
error = cJSON_CreateObject(); error = cJSON_CreateObject();
cJSON_AddStringToObject(error, "errormessage", "Command not specified"); cJSON_AddStringToObject(error, "errormessage", "Command not specified");
ks_rpcmessage_create_request("rpcprotocol", "unknowncommand", NULL, NULL, &error, responseP); ks_rpcmessage_create_request("rpcprotocol", "unknowncommand", NULL, NULL, &error, &responseP);
blade_rpc_write_json(responseP);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
@ -774,7 +775,8 @@ static ks_status_t blade_rpc_process_jsonmessage_all(cJSON *request, cJSON **res
if (!callbacks) { if (!callbacks) {
error = cJSON_CreateObject(); error = cJSON_CreateObject();
cJSON_AddStringToObject(error, "errormessage", "Command not supported"); cJSON_AddStringToObject(error, "errormessage", "Command not supported");
ks_rpcmessage_create_response(request, &error, responseP); ks_rpcmessage_create_response(request, &error, &responseP);
blade_rpc_write_json(responseP);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
@ -782,43 +784,84 @@ static ks_status_t blade_rpc_process_jsonmessage_all(cJSON *request, cJSON **res
ks_bool_t isrequest = ks_rpcmessage_isrequest(request); ks_bool_t isrequest = ks_rpcmessage_isrequest(request);
ks_status_t s = KS_STATUS_SUCCESS; enum jrpc_status_t jrcs = JRPC_ERROR;
if (isrequest && callbacks->request_func) { if (isrequest && callbacks->request_func) {
cJSON *responseP = NULL;
if (callbacks->custom && callbacks->custom->prefix_request_func) { if (callbacks->custom && callbacks->custom->prefix_request_func) {
s = callbacks->custom->prefix_request_func(request); jrcs = callbacks->custom->prefix_request_func(request, &responseP);
if (jrcs == JRPC_SEND && responseP) {
blade_rpc_write_json(responseP);
cJSON_Delete(responseP);
responseP = NULL;
}
} }
if (s == KS_STATUS_SUCCESS) { if (jrcs != JRPC_ERROR) {
s = callbacks->request_func(request, responseP); jrcs = callbacks->request_func(request, &responseP);
if (jrcs == JRPC_SEND && responseP) {
blade_rpc_write_json(responseP);
cJSON_Delete(responseP);
responseP = NULL;
}
} }
if (s == KS_STATUS_SUCCESS && callbacks->custom && callbacks->custom->postfix_request_func) { if (jrcs != JRPC_ERROR && callbacks->custom && callbacks->custom->postfix_request_func) {
s = callbacks->custom->postfix_request_func(request, responseP); jrcs = callbacks->custom->postfix_request_func(request, &responseP);
if (jrcs == JRPC_SEND && responseP) {
blade_rpc_write_json(responseP);
cJSON_Delete(responseP);
responseP = NULL;
}
} }
ks_mutex_unlock(callbacks->lock); ks_mutex_unlock(callbacks->lock);
return s; if (jrcs == JRPC_ERROR) {
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
} }
else if (!isrequest && callbacks->response_func) { else if (!isrequest && callbacks->response_func) {
if (callbacks->custom && callbacks->custom->prefix_response_func) { if (callbacks->custom && callbacks->custom->prefix_response_func) {
s = callbacks->custom->prefix_response_func(response); jrcs = callbacks->custom->prefix_response_func(response, &responseP);
if (jrcs == JRPC_SEND && responseP) {
blade_rpc_write_json(responseP);
cJSON_Delete(responseP);
responseP = NULL;
}
} }
if (s == KS_STATUS_SUCCESS) { if (jrcs != JRPC_ERROR) {
s = callbacks->response_func(response); jrcs = callbacks->response_func(response, &responseP);
if (jrcs == JRPC_SEND && responseP) {
blade_rpc_write_json(responseP);
cJSON_Delete(responseP);
responseP = NULL;
}
} }
if (s == KS_STATUS_SUCCESS && callbacks->custom && callbacks->custom->postfix_response_func) { if (jrcs != JRPC_ERROR && callbacks->custom && callbacks->custom->postfix_response_func) {
s = callbacks->custom->postfix_response_func(response); jrcs = callbacks->custom->postfix_response_func(response, &responseP);
if (jrcs == JRPC_SEND && responseP) {
blade_rpc_write_json(responseP);
cJSON_Delete(responseP);
responseP = NULL;
}
} }
ks_mutex_unlock(callbacks->lock); ks_mutex_unlock(callbacks->lock);
return s;
if (jrcs == JRPC_ERROR) {
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
} }
ks_log(KS_LOG_ERROR, "Unable to find message handler for %s\n", command); ks_log(KS_LOG_ERROR, "Unable to find message handler for %s\n", command);
@ -829,13 +872,9 @@ static ks_status_t blade_rpc_process_jsonmessage_all(cJSON *request, cJSON **res
/* /*
* *
*/ */
KS_DECLARE(ks_status_t) blade_rpc_process_jsonmessage(cJSON *request, cJSON **responseP) KS_DECLARE(ks_status_t) blade_rpc_process_jsonmessage(cJSON *request)
{ {
ks_status_t respstatus = blade_rpc_process_jsonmessage_all(request, responseP); ks_status_t respstatus = blade_rpc_process_jsonmessage_all(request);
cJSON *response = *responseP;
if (respstatus == KS_STATUS_SUCCESS && response != NULL) {
blade_rpc_write_json(response);
}
return respstatus; return respstatus;
} }
@ -851,12 +890,7 @@ KS_DECLARE(ks_status_t) blade_rpc_process_data(const uint8_t *data,
/* deal with rpc message */ /* deal with rpc message */
if (ks_rpcmessage_isrpc(json)) { if (ks_rpcmessage_isrpc(json)) {
cJSON *response = NULL; ks_status_t respstatus = blade_rpc_process_jsonmessage_all(json);
ks_status_t respstatus = blade_rpc_process_jsonmessage_all(json, &response);
if (respstatus == KS_STATUS_SUCCESS && response != NULL) {
blade_rpc_write_json(response);
cJSON_Delete(response);
}
return respstatus; return respstatus;
} }

View File

@ -48,13 +48,14 @@
#define KS_RPCMESSAGE_VERSION_LENGTH 9 #define KS_RPCMESSAGE_VERSION_LENGTH 9
typedef ks_status_t (*jrpc_func_t) (cJSON *request, cJSON **responseP); enum jrpc_status_t {
typedef ks_status_t (*jrpc_prefix_func_t) (cJSON *request); JRPC_PASS = 0,
typedef ks_status_t (*jrpc_postfix_func_t) (cJSON *response, cJSON **responseP); JRPC_SEND,
JRPC_ERROR
};
typedef ks_status_t (*jrpc_resp_func_t) (cJSON *response);
typedef ks_status_t (*jrpc_prefix_resp_func_t) (cJSON *response); typedef enum jrpc_status_t (*jrpc_func_t) (cJSON *request, cJSON **responseP);
typedef ks_status_t (*jrpc_postfix_resp_func_t) (cJSON *response);
/* /*
@ -74,17 +75,31 @@ KS_DECLARE(ks_status_t) blade_rpc_declare_namespace(char* namespace, const char*
KS_DECLARE(ks_status_t) blade_rpc_register_function(char* namespace, KS_DECLARE(ks_status_t) blade_rpc_register_function(char* namespace,
char *command, char *command,
jrpc_func_t func, jrpc_func_t func,
jrpc_resp_func_t respfunc); jrpc_func_t respfunc);
KS_DECLARE(ks_status_t) blade_rpc_register_prefix_request_function(char* namespace, KS_DECLARE(ks_status_t) blade_rpc_register_prefix_request_function(char* namespace,
char *command, char *command,
jrpc_prefix_func_t prefix_func, jrpc_func_t prefix_func,
jrpc_postfix_func_t postfix_func); jrpc_func_t postfix_func);
KS_DECLARE(ks_status_t) blade_rpc_register_prefix_response_function(char *namespace, KS_DECLARE(ks_status_t) blade_rpc_register_prefix_response_function(char *namespace,
char *command, char *command,
jrpc_prefix_resp_func_t prefix_func, jrpc_func_t prefix_func,
jrpc_postfix_resp_func_t postfix_func); jrpc_func_t postfix_func);
KS_DECLARE(void) blade_rpc_remove_namespace(char* namespace); KS_DECLARE(void) blade_rpc_remove_namespace(char* namespace);
/*
* template registration and inheritance
* -------------------------------------
*/
KS_DECLARE(ks_status_t) blade_rpc_declare_template(char* templatename, const char* version);
KS_DECLARE(ks_status_t)blade_rpc_register_template_function(char *name,
char *command,
jrpc_func_t func,
jrpc_func_t respfunc);
KS_DECLARE(ks_status_t)blade_rpc_inherit_template(char *namespace, char* template);
/* /*
* peer create/destroy * peer create/destroy
@ -97,7 +112,7 @@ KS_DECLARE(ks_status_t) blade_rpc_disconnect(blade_peer_t* peer);
* send message * send message
* ------------ * ------------
*/ */
KS_DECLARE(ks_status_t) blade_rpc_write(char *sessionid, char* data, uint32_t size); KS_DECLARE(ks_status_t) blade_rpc_write(char *sessionid, char* data, uint32_t size); //uuid_t ?
KS_DECLARE(ks_status_t) blade_rpc_write_json(cJSON* json); KS_DECLARE(ks_status_t) blade_rpc_write_json(cJSON* json);
@ -108,7 +123,7 @@ KS_DECLARE(ks_status_t) blade_rpc_write_json(cJSON* json);
KS_DECLARE(ks_status_t) blade_rpc_process_blademessage(blade_message_t *message); KS_DECLARE(ks_status_t) blade_rpc_process_blademessage(blade_message_t *message);
KS_DECLARE(ks_status_t) blade_rpc_process_data(const uint8_t *data, ks_size_t size); KS_DECLARE(ks_status_t) blade_rpc_process_data(const uint8_t *data, ks_size_t size);
KS_DECLARE(ks_status_t) blade_rpc_process_jsonmessage(cJSON *request, cJSON **responseP); KS_DECLARE(ks_status_t) blade_rpc_process_jsonmessage(cJSON *request);
#endif #endif