diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index 3ac336697f..69f1a39366 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -37,16 +37,18 @@ struct blade_connection_s { blade_handle_t *handle; ks_pool_t *pool; + void *transport_init_data; void *transport_data; blade_transport_callbacks_t *transport_callbacks; ks_bool_t shutdown; // @todo add auto generated UUID + blade_connection_direction_t direction; ks_thread_t *state_thread; blade_connection_state_t state; ks_q_t *sending; - ks_q_t *receiving; + //ks_q_t *receiving; }; void *blade_connection_state_thread(ks_thread_t *thread, void *data); @@ -54,7 +56,7 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data); KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, blade_handle_t *bh, - void *transport_data, + void *transport_init_data, blade_transport_callbacks_t *transport_callbacks) { blade_connection_t *bc = NULL; @@ -62,7 +64,6 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, ks_assert(bcP); ks_assert(bh); - ks_assert(transport_data); ks_assert(transport_callbacks); pool = blade_handle_pool_get(bh); @@ -70,10 +71,10 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, bc = ks_pool_alloc(pool, sizeof(blade_connection_t)); bc->handle = bh; bc->pool = pool; - bc->transport_data = transport_data; + bc->transport_init_data = transport_init_data; bc->transport_callbacks = transport_callbacks; ks_q_create(&bc->sending, pool, 0); - ks_q_create(&bc->receiving, pool, 0); + //ks_q_create(&bc->receiving, pool, 0); *bcP = bc; return KS_STATUS_SUCCESS; @@ -91,17 +92,18 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP) blade_connection_shutdown(bc); ks_q_destroy(&bc->sending); - ks_q_destroy(&bc->receiving); + //ks_q_destroy(&bc->receiving); ks_pool_free(bc->pool, bcP); return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc) +KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction) { ks_assert(bc); + bc->direction = direction; blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE); if (ks_thread_create_ex(&bc->state_thread, @@ -112,7 +114,6 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc) KS_PRI_NORMAL, bc->pool) != KS_STATUS_SUCCESS) { // @todo error logging - blade_connection_disconnect(bc); return KS_STATUS_FAIL; } @@ -136,6 +137,13 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) return KS_STATUS_SUCCESS; } +KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc) +{ + ks_assert(bc); + + return bc->transport_init_data; +} + KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc) { ks_assert(bc); @@ -143,19 +151,72 @@ KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc) return bc->transport_data; } -KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state) +KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data) { ks_assert(bc); - bc->transport_callbacks->onstate(bc, state, BLADE_CONNECTION_STATE_CONDITION_PRE); + bc->transport_data = transport_data; +} + +blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state) +{ + blade_transport_state_callback_t callback = NULL; + + ks_assert(bc); + + switch (state) { + case BLADE_CONNECTION_STATE_DISCONNECT: + if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_disconnect_inbound; + else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_disconnect_outbound; + break; + case BLADE_CONNECTION_STATE_NEW: + if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_new_inbound; + else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_new_outbound; + break; + case BLADE_CONNECTION_STATE_CONNECT: + if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_connect_inbound; + else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_connect_outbound; + break; + case BLADE_CONNECTION_STATE_ATTACH: + if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_attach_inbound; + else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_attach_outbound; + break; + case BLADE_CONNECTION_STATE_DETACH: + if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_detach_inbound; + else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_detach_outbound; + break; + case BLADE_CONNECTION_STATE_READY: + if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_ready_inbound; + else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_ready_outbound; + break; + default: break; + } + + return callback; +} + +KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state) +{ + blade_transport_state_callback_t callback = NULL; + blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; + + ks_assert(bc); + + callback = blade_connection_state_callback_lookup(bc, state); + + if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_PRE); + bc->state = state; + + if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc); } KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc) { ks_assert(bc); - blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT); + if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT) + blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH); } KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json) @@ -178,26 +239,30 @@ KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, bla return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json) -{ - ks_assert(bc); - ks_assert(json); +// @todo may not need receiving queue on connection, by the time we are queueing we should have a session to receive into +//KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json) +//{ +// ks_assert(bc); +// ks_assert(json); - return ks_q_push(bc->receiving, json); -} +// return ks_q_push(bc->receiving, json); +//} -KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json) -{ - ks_assert(bc); - ks_assert(json); +//KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json) +//{ +// ks_assert(bc); +// ks_assert(json); - return ks_q_trypop(bc->receiving, (void **)json); -} +// return ks_q_trypop(bc->receiving, (void **)json); +//} void *blade_connection_state_thread(ks_thread_t *thread, void *data) { blade_connection_t *bc = NULL; - blade_connection_state_hook_t hook; + blade_connection_state_t state; + blade_transport_state_callback_t callback = NULL; + blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; + cJSON *json = NULL; ks_assert(thread); ks_assert(data); @@ -205,20 +270,34 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) bc = (blade_connection_t *)data; while (!bc->shutdown) { - // @todo need to get messages from the transport into receiving queue, and pop messages from sending queue to write out using transport - // sending is relatively easy, but receiving cannot occur universally due to cases like kws_init() blocking and expecting data to be on the wire - // and other transports may have similar behaviours, but CONNECTIN, ATTACH, and READY require async message passing into application layer - // and sending whenever the response hits the queue + + // @todo pop from connection sending queue and call transport callback to write one message (passing target identity too) + // and delete the cJSON object here after returning from callback + - // @todo it's possible that onstate could handle receiving and sending messages during the appropriate states, but this means some states - // like CONNECTIN which may send and receive multiple messages require BYPASSing until the application layer updates the state or disconnects + // @todo seems like connection will not need a receiving queue as the session will exist prior to async transmissions + + state = bc->state; + hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; + callback = blade_connection_state_callback_lookup(bc, state); + + // @todo should this just go in the ready state callback? it's generalized here, so the callback for READY doesn't really + // need to do anything + if (state == BLADE_CONNECTION_STATE_READY && bc->transport_callbacks->onreceive(bc, &json) == KS_STATUS_SUCCESS && json) { + // @todo push json to session receiving queue + + } - hook = bc->transport_callbacks->onstate(bc, bc->state, BLADE_CONNECTION_STATE_CONDITION_POST); - if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) - blade_connection_disconnect(bc); + if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST); + + if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT && (state == BLADE_CONNECTION_STATE_DETACH || state == BLADE_CONNECTION_STATE_DISCONNECT)) + hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; + + if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc); else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) { - // @todo pop from sending queue, and pass to transport callback to send out - switch (bc->state) { + switch (state) { + case BLADE_CONNECTION_STATE_DISCONNECT: + return NULL; case BLADE_CONNECTION_STATE_NEW: blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT); break; @@ -226,10 +305,14 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH); break; case BLADE_CONNECTION_STATE_ATTACH: + // @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge? + // determine how much of session management is handled here... do we process these session negotiation messages without + // passing it up to the application layer? or does the application layer give back a session and build the response? blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY); break; case BLADE_CONNECTION_STATE_DETACH: - blade_connection_disconnect(bc); + // @todo detach from session if this connection is attached + blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT); break; default: break; } diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 3efe4307af..75f2993a51 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -37,6 +37,7 @@ typedef struct blade_module_wss_s blade_module_wss_t; typedef struct blade_transport_wss_s blade_transport_wss_t; +typedef struct blade_transport_wss_init_s blade_transport_wss_init_t; struct blade_module_wss_s { blade_handle_t *handle; @@ -70,42 +71,81 @@ struct blade_transport_wss_s { kws_t *kws; }; +struct blade_transport_wss_init_s { + blade_module_wss_t *module; + ks_pool_t *pool; + + ks_socket_t sock; +}; + ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh); ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP); -ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh); -ks_status_t blade_module_wss_onunload(blade_module_t *bm); -ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config); -ks_status_t blade_module_wss_onshutdown(blade_module_t *bm); +ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh); +ks_status_t blade_module_wss_on_unload(blade_module_t *bm); +ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config); +ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm); ks_status_t blade_module_wss_listen(blade_module_wss_t *bm, ks_sockaddr_t *addr); void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data); + ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock); ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP); -ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); -blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target); -blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition); +ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); +blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target); + +ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json); +ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json); + +blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition); +blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition); + + + +ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock); +ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP); static blade_module_callbacks_t g_module_wss_callbacks = { - blade_module_wss_onload, - blade_module_wss_onunload, - blade_module_wss_onstartup, - blade_module_wss_onshutdown, + blade_module_wss_on_load, + blade_module_wss_on_unload, + blade_module_wss_on_startup, + blade_module_wss_on_shutdown, }; static blade_transport_callbacks_t g_transport_wss_callbacks = { - blade_transport_wss_onconnect, - blade_transport_wss_onrank, - blade_transport_wss_onstate, + blade_transport_wss_on_connect, + blade_transport_wss_on_rank, + blade_transport_wss_on_send, + blade_transport_wss_on_receive, + + blade_transport_wss_on_state_disconnect, + blade_transport_wss_on_state_disconnect, + blade_transport_wss_on_state_new_inbound, + blade_transport_wss_on_state_new_outbound, + blade_transport_wss_on_state_connect_inbound, + blade_transport_wss_on_state_connect_outbound, + blade_transport_wss_on_state_attach_inbound, + blade_transport_wss_on_state_attach_outbound, + blade_transport_wss_on_state_detach, + blade_transport_wss_on_state_detach, + blade_transport_wss_on_state_ready, + blade_transport_wss_on_state_ready, }; @@ -144,7 +184,7 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP) bm_wss = *bm_wssP; - blade_module_wss_onshutdown(bm_wss->module); + blade_module_wss_on_shutdown(bm_wss->module); blade_module_destroy(&bm_wss->module); @@ -156,7 +196,7 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP) return KS_STATUS_SUCCESS; } -ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh) +ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh) { blade_module_wss_t *bm_wss = NULL; @@ -171,7 +211,7 @@ ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh) return KS_STATUS_SUCCESS; } -ks_status_t blade_module_wss_onunload(blade_module_t *bm) +ks_status_t blade_module_wss_on_unload(blade_module_t *bm) { blade_module_wss_t *bm_wss = NULL; @@ -293,7 +333,7 @@ ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t return KS_STATUS_SUCCESS; } -ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config) +ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config) { blade_module_wss_t *bm_wss = NULL; @@ -331,7 +371,7 @@ ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *con return KS_STATUS_SUCCESS; } -ks_status_t blade_module_wss_onshutdown(blade_module_t *bm) +ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm) { blade_module_wss_t *bm_wss = NULL; blade_transport_wss_t *bt_wss = NULL; @@ -423,6 +463,7 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) { blade_module_wss_t *bm_wss = NULL; + blade_transport_wss_init_t *bt_wss_init = NULL; blade_transport_wss_t *bt_wss = NULL; blade_connection_t *bc = NULL; @@ -448,27 +489,32 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) continue; } - blade_transport_wss_create(&bt_wss, bm_wss, sock); - ks_assert(bt_wss); + blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock); + ks_assert(bt_wss_init); - blade_connection_create(&bc, bm_wss->handle, bt_wss, bm_wss->transport_callbacks); + blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks); ks_assert(bc); - blade_connection_startup(bc); - + if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) { + blade_connection_destroy(&bc); + blade_transport_wss_init_destroy(&bt_wss_init); + ks_socket_close(&sock); + continue; + } list_append(&bm_wss->connected, bc); - blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW); } } while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) { + bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc); bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); list_delete(&bm_wss->connected, bc); + if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init); blade_connection_destroy(&bc); - blade_transport_wss_destroy(&bt_wss); + if (bt_wss) blade_transport_wss_destroy(&bt_wss); } } @@ -512,7 +558,7 @@ ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP) return KS_STATUS_SUCCESS; } -ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target) +ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target) { ks_assert(bcP); ks_assert(bm); @@ -525,14 +571,44 @@ ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module return KS_STATUS_SUCCESS; } -blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target) +blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target) { ks_assert(bc); ks_assert(target); + return BLADE_CONNECTION_RANK_POOR; +} + +ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json) +{ + char *json_str = cJSON_PrintUnformatted(json); + ks_size_t json_str_len = 0; + if (!json_str) { + // @todo error logging + return KS_STATUS_FAIL; + } + json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this + kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len); + + free(json_str); + return KS_STATUS_SUCCESS; } +ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json) +{ + blade_transport_wss_t *bt_wss = NULL; + + ks_assert(bc); + ks_assert(json); + + ks_log(KS_LOG_DEBUG, "Send Callback\n"); + + bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); + + return blade_transport_wss_write(bt_wss, json); +} + ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json) { // @todo get exact timeout from service config? @@ -559,12 +635,6 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json return KS_STATUS_FAIL; } - //if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) { - // @todo error logging - // return KS_STATUS_FAIL; - //} - - // @todo convert frame_data to cJSON safely, make sure data is null-terminated at frame_data_len if (!(*json = cJSON_Parse((char *)frame_data))) { return KS_STATUS_FAIL; } @@ -572,77 +642,170 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json return KS_STATUS_SUCCESS; } -ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json) -{ - //blade_message_get(message, &target, &json); - char *json_str = cJSON_PrintUnformatted(json); - ks_size_t json_str_len = 0; - if (!json_str) { - // @todo error logging - return KS_STATUS_FAIL; - } - json_str_len = strlen(json_str) + 1; - kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len); - - return KS_STATUS_SUCCESS; -} - -blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition) +ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json) { blade_transport_wss_t *bt_wss = NULL; - //cJSON *json = NULL; ks_assert(bc); + ks_assert(json); + + ks_log(KS_LOG_DEBUG, "Receive Callback\n"); bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); - switch (state) { - case BLADE_CONNECTION_STATE_DISCONNECT: - { - if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) { - ks_q_push(bt_wss->module->disconnected, bc); - blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE); - } - break; - } - case BLADE_CONNECTION_STATE_NEW: - { - if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) { - // @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init - if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) { - // @todo error logging - return BLADE_CONNECTION_STATE_HOOK_DISCONNECT; - } - } - break; - } - case BLADE_CONNECTION_STATE_CONNECT: - { - // @todo abstract read message and write message, so these can be called from connection and processed from there - - //if (blade_transport_wss_read(bt_wss, &json) != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATEHOOK_DISCONNECT; + return blade_transport_wss_read(bt_wss, json); +} - //if (json) { - // @todo processing connectin messages for identity registration - // cJSON_Delete(json); - //blade_connection_receiving_push(conn, json); - //} +blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + blade_transport_wss_t *bt_wss = NULL; + + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS; + + bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); + + ks_q_push(bt_wss->module->disconnected, bc); - // @todo wrap identity + json into an envelope for queueing through the connection - //while (blade_connection_sending_pop(bc, (void **)&json) == KS_STATUS_SUCCESS && json) { - // ks_status_t ret = blade_transport_wss_write(bt_wss, json); - // cJSON_Delete(json); - // if (ret != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATE_HOOK_DISCONNECT; - //} - return BLADE_CONNECTION_STATE_HOOK_SUCCESS; - //break; - } - default: break; - } - return BLADE_CONNECTION_STATE_HOOK_SUCCESS; } +blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + blade_transport_wss_t *bt_wss = NULL; + blade_transport_wss_init_t *bt_wss_init = NULL; + + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS; + + bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc); + + blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock); + ks_assert(bt_wss); + + blade_connection_transport_set(bc, bt_wss); + + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; +} + +blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS; + + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; +} + +blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + blade_transport_wss_t *bt_wss = NULL; + + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS; + + bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); + + // @todo: SSL init stuffs based on data from config to pass into kws_init + if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) { + // @todo error logging + return BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + } + + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; +} + +blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; +} + +blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + // @todo Establish sessid and discover existing session or create and register new session through BLADE commands + // Set session state to CONNECT if its new or RECONNECT if existing + // start session and its thread if its new + + return BLADE_CONNECTION_STATE_HOOK_BYPASS; +} + +blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; +} + +blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; +} + +blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition) +{ + ks_assert(bc); + + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; +} + +ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock) +{ + blade_transport_wss_init_t *bt_wssi = NULL; + + ks_assert(bt_wssiP); + ks_assert(bm_wss); + ks_assert(sock != KS_SOCK_INVALID); + + bt_wssi = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_init_t)); + bt_wssi->module = bm_wss; + bt_wssi->pool = bm_wss->pool; + bt_wssi->sock = sock; + + *bt_wssiP = bt_wssi; + + return KS_STATUS_SUCCESS; +} + +ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP) +{ + blade_transport_wss_init_t *bt_wssi = NULL; + + ks_assert(bt_wssiP); + ks_assert(*bt_wssiP); + + bt_wssi = *bt_wssiP; + + ks_pool_free(bt_wssi->pool, bt_wssiP); + + return KS_STATUS_SUCCESS; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libblade/src/include/blade_connection.h b/libs/libblade/src/include/blade_connection.h index d48ec8c221..70ee105b7d 100644 --- a/libs/libblade/src/include/blade_connection.h +++ b/libs/libblade/src/include/blade_connection.h @@ -41,9 +41,11 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, void *transport_data, blade_transport_callbacks_t *transport_callbacks); KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP); -KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc); +KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction); KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc); +KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc); KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc); +KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data); KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state); KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc); KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target); diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index cc2c054197..c2c961266b 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -62,8 +62,8 @@ typedef enum { } blade_connection_state_t; typedef enum { - BLADE_CONNECTION_DIRECTION_IN, - BLADE_CONNECTION_DIRECTION_OUT, + BLADE_CONNECTION_DIRECTION_INBOUND, + BLADE_CONNECTION_DIRECTION_OUTBOUND, } blade_connection_direction_t; typedef enum { @@ -99,14 +99,28 @@ struct blade_module_callbacks_s { typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target); -typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, - blade_connection_state_t state, - blade_connection_state_condition_t condition); +typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, blade_identity_t *target, cJSON *json); +typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc, cJSON **json); +typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, blade_connection_state_condition_t condition); struct blade_transport_callbacks_s { blade_transport_connect_callback_t onconnect; blade_transport_rank_callback_t onrank; - blade_transport_state_callback_t onstate; + blade_transport_send_callback_t onsend; + blade_transport_receive_callback_t onreceive; + + blade_transport_state_callback_t onstate_disconnect_inbound; + blade_transport_state_callback_t onstate_disconnect_outbound; + blade_transport_state_callback_t onstate_new_inbound; + blade_transport_state_callback_t onstate_new_outbound; + blade_transport_state_callback_t onstate_connect_inbound; + blade_transport_state_callback_t onstate_connect_outbound; + blade_transport_state_callback_t onstate_attach_inbound; + blade_transport_state_callback_t onstate_attach_outbound; + blade_transport_state_callback_t onstate_detach_inbound; + blade_transport_state_callback_t onstate_detach_outbound; + blade_transport_state_callback_t onstate_ready_inbound; + blade_transport_state_callback_t onstate_ready_outbound; };