From fe36139965834957b34b3622419b515eb3f9266b Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Mon, 12 Dec 2016 20:33:48 +0000 Subject: [PATCH] FS-9775: Started mocking out structures for ks_dht_search, but merged route table updates and updated the tests for current pending issues --- libs/libks/Makefile.am | 2 +- libs/libks/src/dht/ks_dht-int.h | 21 ++++ libs/libks/src/dht/ks_dht.c | 73 ++++++----- libs/libks/src/dht/ks_dht.h | 41 +++++- libs/libks/src/dht/ks_dht_search.c | 195 +++++++++++++++++++++++++++++ libs/libks/test/testdht2.c | 28 ++++- 6 files changed, 320 insertions(+), 40 deletions(-) create mode 100644 libs/libks/src/dht/ks_dht_search.c diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index 94b01d454f..d1931c19bd 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c -libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c +libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c src/dht/ks_dht_search.c libks_la_SOURCES += src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index fbb434f114..ae7ae23870 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -248,6 +248,27 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, ks_socket_t sock); KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool); +KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search); + +KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, + const ks_dht_nodeid_t *target, + ks_dht_search_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search); + +KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback); + +KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool); +KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending); + +KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration); +KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending); + /** * */ diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 2fcbe55866..870c9daec2 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -786,7 +786,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM; - memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE); + memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE); *buffer_length += KS_DHT_NODEID_SIZE; return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address); @@ -1132,8 +1132,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction->raddr.host, transaction->raddr.port); } else { - transaction->finished = KS_TRUE; ret = transaction->callback(dht, message); + transaction->finished = KS_TRUE; } return ret; @@ -1315,6 +1315,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ struct bencode *r = NULL; ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_assert(dht); ks_assert(message); @@ -1324,10 +1325,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ routetable = message->endpoint->node->table; - // @todo touch here, or only create if not exists? - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); @@ -1354,6 +1353,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa ks_dht_nodeid_t *id; ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_assert(dht); ks_assert(message); @@ -1362,9 +1362,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa routetable = message->endpoint->node->table; - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); @@ -1435,9 +1437,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess routetable = message->endpoint->node->table; - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); @@ -1536,9 +1537,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m routetable = message->endpoint->node->table; - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; while (nodes_len < nodes_size) { ks_dht_nodeid_t nid; @@ -1553,9 +1556,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m addr.host, addr.port); - if (ks_dhtrt_touch_node(dht->rt_ipv4, nid) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); } while (nodes6_len < nodes6_size) { @@ -1571,9 +1573,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m addr.host, addr.port); - if (ks_dhtrt_touch_node(dht->rt_ipv6, nid) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); } // @todo repeat above for ipv6 table @@ -1617,6 +1618,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t struct bencode *r = NULL; ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_assert(dht); ks_assert(message); @@ -1631,9 +1633,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t routetable = message->endpoint->node->table; - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); @@ -1685,6 +1686,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag ks_dht_token_t *token; ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_assert(dht); ks_assert(message); @@ -1699,9 +1701,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag routetable = message->endpoint->node->table; - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; // @todo add/touch bucket entries for other nodes/nodes6 returned ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); @@ -1719,6 +1723,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t struct bencode *r = NULL; ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_assert(dht); ks_assert(message); @@ -1728,9 +1733,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t routetable = message->endpoint->node->table; - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); @@ -1757,6 +1761,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag ks_dht_nodeid_t *id; ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_assert(dht); ks_assert(message); @@ -1765,9 +1770,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag routetable = message->endpoint->node->table; - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { - ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node); - } + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index f151938237..a27e63dd6e 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -21,6 +21,7 @@ KS_BEGIN_EXTERN_C #define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30 #define KS_DHT_SEARCH_EXPIRATION 10 +#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE #define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64 @@ -39,6 +40,8 @@ typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t; typedef struct ks_dht_message_s ks_dht_message_t; typedef struct ks_dht_endpoint_s ks_dht_endpoint_t; typedef struct ks_dht_transaction_s ks_dht_transaction_t; +typedef struct ks_dht_search_s ks_dht_search_t; +typedef struct ks_dht_search_pending_s ks_dht_search_pending_t; typedef struct ks_dht_node_s ks_dht_node_t; typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t; typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t; @@ -46,6 +49,7 @@ typedef struct ks_dht_storageitem_s ks_dht_storageitem_t; typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message); +typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search); /** * Note: This must remain a structure for casting from raw data @@ -122,10 +126,39 @@ struct ks_dht_transaction_s { ks_bool_t finished; }; +// Check if search already exists for the target id, if so add another callback, must be a popular target id +// Otherwise create new search, set target id, add callback, and insert the search into the dht search_hash with target id key +// Get closest local nodes to target id, check against results, send_findnode for closer nodes and add to pending hash with queried node id +// Upon receiving find_node response, check target id against dht search_hash, check responding node id against pending hash, set finished for purging +// Update results if responding node id is closer than any current result, or the results are not full +// Check response nodes against results, send_findnode for closer nodes and add to pending hash with an expiration +// Pulse expirations purges expired and finished from pending hash, once hash is empty callbacks are called providing results array +// Note: +// During the lifetime of a search, the ks_dht_node_t's must be kept alive +// Do a query touch on nodes prior to being added to pending, this should reset timeout and keep the nodes alive long enough even if they are dubious +// Nodes which land in results are known good with recent response to find_nodes and should be around for a while before route table worries about cleanup +struct ks_dht_search_s { + ks_pool_t *pool; + ks_mutex_t *mutex; + ks_dht_nodeid_t target; + ks_dht_search_callback_t *callbacks; + ks_size_t callbacks_size; + ks_hash_t *pending; + ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; + ks_size_t results_length; +}; + +struct ks_dht_search_pending_s { + ks_pool_t *pool; + ks_dht_node_t *node; + ks_time_t expiration; + ks_bool_t finished; +}; + struct ks_dht_storageitem_s { ks_pool_t *pool; ks_dht_nodeid_t id; - + // @todo ks_time_t expiration; struct bencode *v; ks_bool_t mutable; @@ -169,6 +202,8 @@ struct ks_dht_s { ks_dhtrt_routetable_t *rt_ipv4; ks_dhtrt_routetable_t *rt_ipv6; + ks_hash_t *search_hash; + volatile uint32_t token_secret_current; volatile uint32_t token_secret_previous; ks_time_t token_secret_expiration; @@ -275,8 +310,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, * Bind a local address and port for receiving UDP datagrams. * @param dht pointer to the dht instance * @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly - * @param addr pointer to the remote address information - * @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint + * @param addr pointer to the local address information + * @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ... * @see ks_socket_option * @see ks_addr_bind diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c new file mode 100644 index 0000000000..4dfd3b2008 --- /dev/null +++ b/libs/libks/src/dht/ks_dht_search.c @@ -0,0 +1,195 @@ +#include "ks_dht.h" +#include "ks_dht-int.h" +#include "sodium.h" + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool) +{ + ks_dht_search_t *s; + + ks_assert(search); + ks_assert(pool); + + *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t)); + s->pool = pool; + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool) +{ + ks_assert(search); + ks_assert(pool); + + memset(search, 0, sizeof(ks_dht_search_t)); + + search->pool = pool; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(search); + ks_assert(*search); + + if ((ret = ks_dht_search_deinit(*search)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_pool_free((*search)->pool, *search)) != KS_STATUS_SUCCESS) return ret; + + *search = NULL; + + return KS_STATUS_SUCCESS; +} + + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(search); + ks_assert(search->pool); + ks_assert(target); + + if ((ret = ks_mutex_create(&search->mutex, KS_MUTEX_FLAG_DEFAULT, search->pool)) != KS_STATUS_SUCCESS) return ret; + memcpy(search->target.id, target->id, KS_DHT_NODEID_SIZE); + + if (callback) ks_dht_search_callback_add(search, callback); + + if ((ret = ks_hash_create(&search->pending, + KS_HASH_MODE_ARBITRARY, + KS_HASH_FLAG_RWLOCK, + search->pool)) != KS_STATUS_SUCCESS) return ret; + ks_hash_set_keysize(search->pending, KS_DHT_NODEID_SIZE); + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search) +{ + ks_hash_iterator_t *it; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(search); + + search->results_length = 0; + if (search->pending) { + for (it = ks_hash_first(search->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key; + ks_dht_search_pending_t *val; + + ks_hash_this(it, &key, NULL, (void **)&val); + if ((ret = ks_dht_search_pending_deinit(val)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_dht_search_pending_free(&val)) != KS_STATUS_SUCCESS) return ret; + } + ks_hash_destroy(&search->pending); + } + search->callbacks_size = 0; + if (search->callbacks) { + if ((ret = ks_pool_free(search->pool, search->callbacks)) != KS_STATUS_SUCCESS) return ret; + search->callbacks = NULL; + } + if (search->mutex && (ret = ks_mutex_destroy(&search->mutex)) != KS_STATUS_SUCCESS) return ret; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback) +{ + ks_assert(search); + + if (callback) { + int32_t index = search->callbacks_size++; + search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool, + (void *)search->callbacks, + sizeof(ks_dht_search_callback_t) * search->callbacks_size); + search->callbacks[index] = callback; + } + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool) +{ + ks_dht_search_pending_t *p; + + ks_assert(pending); + ks_assert(pool); + + *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t)); + p->pool = pool; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool) +{ + ks_assert(pending); + ks_assert(pool); + + memset(pending, 0, sizeof(ks_dht_search_pending_t)); + + pending->pool = pool; +} + +KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(pending); + ks_assert(*pending); + + if ((ret = ks_dht_search_pending_deinit(*pending)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_pool_free((*pending)->pool, *pending)) != KS_STATUS_SUCCESS) return ret; + + *pending = NULL; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration) +{ + ks_assert(pending); + ks_assert(pending->pool); + ks_assert(node); + + pending->node = node; + pending->expiration = expiration; + pending->finished = KS_FALSE; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending) +{ + ks_assert(pending); + + pending->node = NULL; + pending->expiration = 0; + pending->finished = KS_FALSE; + + return KS_STATUS_SUCCESS; +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index d77eee436c..82a00e6364 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -143,15 +143,28 @@ int main() { diag("Ping test\n"); - ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue ping from dht2 to dht1 + ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1 ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response + ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1 - // Test blind find_node from dht3 to dht1 to find dht2 nodeid + ok(ks_dhtrt_find_node(dht2.rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good + + diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up + for (int i = 0; i < 10; ++i) { + diag("DHT 1\n"); + ks_dht_pulse(dht1, 100); + diag("DHT 2\n"); + ks_dht_pulse(&dht2, 100); + } + ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good + + // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid diag("Find_Node test\n"); @@ -163,8 +176,17 @@ int main() { ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1 - ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); + ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up + for (int i = 0; i < 10; ++i) { + diag("DHT 1\n"); + ks_dht_pulse(dht1, 100); + diag("DHT 2\n"); + ks_dht_pulse(&dht2, 100); + } + ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good + diag("Cleanup\n"); /* Cleanup and shutdown */