From 6eed8d3f946cb7d9e238304639961b3df5cccfe4 Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Thu, 1 Dec 2016 04:37:36 +0000 Subject: [PATCH] FS-9775: Added initial registry for 'y' keys, and some unit testing --- libs/libks/src/dht/ks_dht-int.h | 7 +- libs/libks/src/dht/ks_dht.c | 137 ++++++++++++++++++++------- libs/libks/src/dht/ks_dht.h | 15 ++- libs/libks/src/dht/ks_dht_endpoint.c | 2 + libs/libks/test/testdht2.c | 18 +++- 5 files changed, 141 insertions(+), 38 deletions(-) diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index f878faaabf..968462aa1e 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -8,7 +8,12 @@ KS_BEGIN_EXTERN_C KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht); KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); - +KS_DECLARE(ks_status_t) ks_dht2_parse(ks_dht2_t *dht, + struct bencode **message, + uint8_t *transactionid, + ks_size_t *transactionid_len, + char *messagetype); + KS_END_EXTERN_C #endif /* KS_DHT_INT_H */ diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index e86ebee983..d3e76643a7 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -68,6 +68,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const uint8_t *nodeid) if (ks_dht2_nodeid_init(&dht->nodeid, nodeid) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } + + ks_hash_create(&dht->registry_y, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + // @todo ks_hash_insert the q/r/e callbacks into y registry dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; @@ -90,16 +93,43 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) ks_assert(dht); dht->recv_buffer_length = 0; - // @todo dht->endpoints_poll deinit - // @todo dht->endpoints deinit + for (int32_t i = 0; i < dht->endpoints_size; ++i) { + ks_dht2_endpoint_t *ep = dht->endpoints[i]; + //ks_hash_remove(dht->endpoints_hash, ep->addr.host); + ks_dht2_endpoint_deinit(ep); + ks_dht2_endpoint_free(ep); + } + if (dht->endpoints) { + ks_pool_free(dht->pool, dht->endpoints); + dht->endpoints = NULL; + } + if (dht->endpoints_poll) { + ks_pool_free(dht->pool, dht->endpoints_poll); + dht->endpoints_poll = NULL; + } ks_hash_destroy(&dht->endpoints_hash); dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; + + ks_hash_destroy(&dht->registry_y); + ks_dht2_nodeid_deinit(&dht->nodeid); return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback) +{ + ks_assert(dht); + ks_assert(value); + ks_assert(callback); + + return ks_hash_insert(dht->registry_y, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; +} + /** * */ @@ -220,15 +250,12 @@ KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht) */ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) { - struct bencode *message; - struct bencode *t; - struct bencode *y; - const char *tv; - const char *yv; - ks_size_t tv_len; - ks_size_t yv_len; - uint16_t transactionid; - char messagetype; + struct bencode *message = NULL; + uint8_t transactionid[KS_DHT_TRANSACTIONID_MAX_SIZE]; + ks_size_t transactionid_len; + char messagetype[KS_DHT_MESSAGETYPE_MAX_SIZE]; + ks_dht2_registry_callback_t callback; + ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); ks_assert(raddr); @@ -241,53 +268,99 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) // @todo blacklist check for bad actor nodes - message = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length); - if (!message) { - ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n"); + if (ks_dht2_parse(dht, &message, transactionid, &transactionid_len, messagetype) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } + + if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_y, messagetype, KS_UNLOCKED))) { + ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", messagetype); + } else { + ret = callback(dht, raddr, transactionid, transactionid_len, message); + } + + ben_free(message); + return ret; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_parse(ks_dht2_t *dht, + struct bencode **message, + uint8_t *transactionid, + ks_size_t *transactionid_len, + char *messagetype) +{ + struct bencode *msg = NULL; + struct bencode *t; + struct bencode *y; + const char *tv; + const char *yv; + ks_size_t tv_len; + ks_size_t yv_len; + + ks_assert(dht); + ks_assert(message); + ks_assert(transactionid); + ks_assert(messagetype); + + msg = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length); + if (!msg) { + ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n"); + goto failure; + } ks_log(KS_LOG_DEBUG, "Message decoded\n"); - ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message)); + ks_log(KS_LOG_DEBUG, "%s\n", ben_print(msg)); - t = ben_dict_get_by_str(message, "t"); + t = ben_dict_get_by_str(msg, "t"); if (!t) { ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n"); - return KS_STATUS_FAIL; + goto failure; } tv = ben_str_val(t); tv_len = ben_str_len(t); - if (tv_len != sizeof(uint16_t)) { - ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpected size of %d\n", tv_len); - return KS_STATUS_FAIL; + if (tv_len > KS_DHT_TRANSACTIONID_MAX_SIZE) { + ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpectedly large size of %d\n", tv_len); + goto failure; } - transactionid = ntohs(*((uint16_t *)tv)); - ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", transactionid); + memcpy(transactionid, tv, tv_len); + *transactionid_len = tv_len; + // @todo hex output of transactionid + //ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", *transactionid); - y = ben_dict_get_by_str(message, "y"); + y = ben_dict_get_by_str(msg, "y"); if (!y) { ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n"); - return KS_STATUS_FAIL; + goto failure; } yv = ben_str_val(y); yv_len = ben_str_len(y); - if (yv_len != 1) { - ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpected size of %d\n", yv_len); - return KS_STATUS_FAIL; + if (yv_len >= KS_DHT_MESSAGETYPE_MAX_SIZE) { + ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpectedly large size of %d\n", yv_len); + goto failure; } - messagetype = (char)yv[0]; - ks_log(KS_LOG_DEBUG, "Message type is '%c'\n", messagetype); - - // @todo dispatch callback from the 'y' registry + memcpy(messagetype, yv, yv_len); + messagetype[yv_len] = '\0'; + ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", messagetype); + *message = msg; return KS_STATUS_SUCCESS; + + failure: + if (msg) { + ben_free(msg); + } + *message = NULL; + *transactionid_len = 0; + messagetype[0] = '\0'; + return KS_STATUS_FAIL; } - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 0d916aecce..626b3566f5 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -12,7 +12,8 @@ KS_BEGIN_EXTERN_C #define KS_DHT_DEFAULT_PORT 5309 #define KS_DHT_RECV_BUFFER_SIZE 0xFFFF - +#define KS_DHT_TRANSACTIONID_MAX_SIZE 20 +#define KS_DHT_MESSAGETYPE_MAX_SIZE 20 typedef struct ks_dht2_s ks_dht2_t; struct ks_dht2_s { @@ -21,6 +22,8 @@ struct ks_dht2_s { ks_dht2_nodeid_t nodeid; + ks_hash_t *registry_y; + ks_bool_t bind_ipv4; ks_bool_t bind_ipv6; @@ -33,6 +36,12 @@ struct ks_dht2_s { ks_size_t recv_buffer_length; }; +typedef ks_status_t (*ks_dht2_registry_callback_t)(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_len, + struct bencode *message); + KS_DECLARE(ks_status_t) ks_dht2_alloc(ks_dht2_t **dht, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht2_prealloc(ks_dht2_t *dht, ks_pool_t *pool); @@ -46,6 +55,10 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht); KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr); KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout); + +KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback); + + KS_END_EXTERN_C #endif /* KS_DHT_H */ diff --git a/libs/libks/src/dht/ks_dht_endpoint.c b/libs/libks/src/dht/ks_dht_endpoint.c index 413d33e90a..7368a793f4 100644 --- a/libs/libks/src/dht/ks_dht_endpoint.c +++ b/libs/libks/src/dht/ks_dht_endpoint.c @@ -66,6 +66,8 @@ KS_DECLARE(ks_status_t) ks_dht2_endpoint_deinit(ks_dht2_endpoint_t *endpoint) { ks_assert(endpoint); + ks_socket_close(&endpoint->sock); + return KS_STATUS_SUCCESS; } diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 1895896374..55096209cf 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -4,10 +4,18 @@ #include <../dht/ks_dht_endpoint-int.h> #include +#define TEST_DHT1_REGISTER_Y_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze" #define TEST_DHT1_PROCESS_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe" +ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, uint8_t *transactionid, ks_size_t transactionid_len, struct bencode *message) +{ + diag("dht_z_callback\n"); + ok(transactionid[0] == '4' && transactionid[1] == '2'); + return KS_STATUS_SUCCESS; +} + int main() { - ks_size_t buflen = strlen(TEST_DHT1_PROCESS_BUFFER); + ks_size_t buflen; ks_status_t err; int mask = 0; ks_dht2_t *dht1 = NULL; @@ -21,7 +29,7 @@ int main() { ok(!err); ks_global_set_default_logger(7); - + err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL); ok(err == KS_STATUS_SUCCESS); have_v4 = !zstr_buf(v4); @@ -50,6 +58,8 @@ int main() { err = ks_dht2_init(&dht2, NULL); ok(err == KS_STATUS_SUCCESS); + + ks_dht2_register_y(dht1, "z", dht_z_callback); if (have_v4) { err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET); @@ -81,8 +91,8 @@ int main() { ok(err == KS_STATUS_SUCCESS); } - // @todo populate dht1->recv_buffer and dht1->recv_buffer_length - memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_BUFFER, buflen); + buflen = strlen(TEST_DHT1_REGISTER_Y_BUFFER); + memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_Y_BUFFER, buflen); dht1->recv_buffer_length = buflen; err = ks_dht2_process(dht1, &raddr);