mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-04-14 08:05:37 +00:00
FS-9775: Started mocking out structures for ks_dht_search, but merged route table updates and updated the tests for current pending issues
This commit is contained in:
parent
ff57e94889
commit
fe36139965
@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k
|
||||
libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
|
||||
libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
|
||||
libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
|
||||
libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
|
||||
libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c src/dht/ks_dht_search.c
|
||||
libks_la_SOURCES += src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
|
||||
libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c
|
||||
#aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
|
||||
|
@ -248,6 +248,27 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint,
|
||||
ks_socket_t sock);
|
||||
KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool);
|
||||
KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool);
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search);
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search,
|
||||
const ks_dht_nodeid_t *target,
|
||||
ks_dht_search_callback_t callback);
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search);
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback);
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool);
|
||||
KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool);
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending);
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration);
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -786,7 +786,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
|
||||
|
||||
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM;
|
||||
|
||||
memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
|
||||
memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE);
|
||||
*buffer_length += KS_DHT_NODEID_SIZE;
|
||||
|
||||
return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address);
|
||||
@ -1132,8 +1132,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
|
||||
transaction->raddr.host,
|
||||
transaction->raddr.port);
|
||||
} else {
|
||||
transaction->finished = KS_TRUE;
|
||||
ret = transaction->callback(dht, message);
|
||||
transaction->finished = KS_TRUE;
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -1315,6 +1315,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
|
||||
struct bencode *r = NULL;
|
||||
ks_dhtrt_routetable_t *routetable = NULL;
|
||||
ks_dht_node_t *node = NULL;
|
||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(message);
|
||||
@ -1324,10 +1325,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
// @todo touch here, or only create if not exists?
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
|
||||
|
||||
@ -1354,6 +1353,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa
|
||||
ks_dht_nodeid_t *id;
|
||||
ks_dhtrt_routetable_t *routetable = NULL;
|
||||
ks_dht_node_t *node = NULL;
|
||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(message);
|
||||
@ -1362,9 +1362,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
|
||||
|
||||
@ -1435,9 +1437,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
|
||||
|
||||
@ -1536,9 +1537,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
while (nodes_len < nodes_size) {
|
||||
ks_dht_nodeid_t nid;
|
||||
@ -1553,9 +1556,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
||||
addr.host,
|
||||
addr.port);
|
||||
|
||||
if (ks_dhtrt_touch_node(dht->rt_ipv4, nid) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
|
||||
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
||||
}
|
||||
|
||||
while (nodes6_len < nodes6_size) {
|
||||
@ -1571,9 +1573,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
||||
addr.host,
|
||||
addr.port);
|
||||
|
||||
if (ks_dhtrt_touch_node(dht->rt_ipv6, nid) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
|
||||
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
||||
}
|
||||
// @todo repeat above for ipv6 table
|
||||
|
||||
@ -1617,6 +1618,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
|
||||
struct bencode *r = NULL;
|
||||
ks_dhtrt_routetable_t *routetable = NULL;
|
||||
ks_dht_node_t *node = NULL;
|
||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(message);
|
||||
@ -1631,9 +1633,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
|
||||
|
||||
@ -1685,6 +1686,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
|
||||
ks_dht_token_t *token;
|
||||
ks_dhtrt_routetable_t *routetable = NULL;
|
||||
ks_dht_node_t *node = NULL;
|
||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(message);
|
||||
@ -1699,9 +1701,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
// @todo add/touch bucket entries for other nodes/nodes6 returned
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
|
||||
@ -1719,6 +1723,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
|
||||
struct bencode *r = NULL;
|
||||
ks_dhtrt_routetable_t *routetable = NULL;
|
||||
ks_dht_node_t *node = NULL;
|
||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(message);
|
||||
@ -1728,9 +1733,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
|
||||
|
||||
@ -1757,6 +1761,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag
|
||||
ks_dht_nodeid_t *id;
|
||||
ks_dhtrt_routetable_t *routetable = NULL;
|
||||
ks_dht_node_t *node = NULL;
|
||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(message);
|
||||
@ -1765,9 +1770,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag
|
||||
|
||||
routetable = message->endpoint->node->table;
|
||||
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
|
||||
ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
|
||||
}
|
||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
|
||||
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
|
||||
|
||||
|
@ -21,6 +21,7 @@ KS_BEGIN_EXTERN_C
|
||||
|
||||
#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30
|
||||
#define KS_DHT_SEARCH_EXPIRATION 10
|
||||
#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
|
||||
|
||||
#define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
|
||||
#define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
|
||||
@ -39,6 +40,8 @@ typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t;
|
||||
typedef struct ks_dht_message_s ks_dht_message_t;
|
||||
typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
|
||||
typedef struct ks_dht_transaction_s ks_dht_transaction_t;
|
||||
typedef struct ks_dht_search_s ks_dht_search_t;
|
||||
typedef struct ks_dht_search_pending_s ks_dht_search_pending_t;
|
||||
typedef struct ks_dht_node_s ks_dht_node_t;
|
||||
typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t;
|
||||
typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
|
||||
@ -46,6 +49,7 @@ typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
|
||||
|
||||
|
||||
typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
|
||||
typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
|
||||
|
||||
/**
|
||||
* Note: This must remain a structure for casting from raw data
|
||||
@ -122,10 +126,39 @@ struct ks_dht_transaction_s {
|
||||
ks_bool_t finished;
|
||||
};
|
||||
|
||||
// Check if search already exists for the target id, if so add another callback, must be a popular target id
|
||||
// Otherwise create new search, set target id, add callback, and insert the search into the dht search_hash with target id key
|
||||
// Get closest local nodes to target id, check against results, send_findnode for closer nodes and add to pending hash with queried node id
|
||||
// Upon receiving find_node response, check target id against dht search_hash, check responding node id against pending hash, set finished for purging
|
||||
// Update results if responding node id is closer than any current result, or the results are not full
|
||||
// Check response nodes against results, send_findnode for closer nodes and add to pending hash with an expiration
|
||||
// Pulse expirations purges expired and finished from pending hash, once hash is empty callbacks are called providing results array
|
||||
// Note:
|
||||
// During the lifetime of a search, the ks_dht_node_t's must be kept alive
|
||||
// Do a query touch on nodes prior to being added to pending, this should reset timeout and keep the nodes alive long enough even if they are dubious
|
||||
// Nodes which land in results are known good with recent response to find_nodes and should be around for a while before route table worries about cleanup
|
||||
struct ks_dht_search_s {
|
||||
ks_pool_t *pool;
|
||||
ks_mutex_t *mutex;
|
||||
ks_dht_nodeid_t target;
|
||||
ks_dht_search_callback_t *callbacks;
|
||||
ks_size_t callbacks_size;
|
||||
ks_hash_t *pending;
|
||||
ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
|
||||
ks_size_t results_length;
|
||||
};
|
||||
|
||||
struct ks_dht_search_pending_s {
|
||||
ks_pool_t *pool;
|
||||
ks_dht_node_t *node;
|
||||
ks_time_t expiration;
|
||||
ks_bool_t finished;
|
||||
};
|
||||
|
||||
struct ks_dht_storageitem_s {
|
||||
ks_pool_t *pool;
|
||||
ks_dht_nodeid_t id;
|
||||
|
||||
// @todo ks_time_t expiration;
|
||||
struct bencode *v;
|
||||
|
||||
ks_bool_t mutable;
|
||||
@ -169,6 +202,8 @@ struct ks_dht_s {
|
||||
ks_dhtrt_routetable_t *rt_ipv4;
|
||||
ks_dhtrt_routetable_t *rt_ipv6;
|
||||
|
||||
ks_hash_t *search_hash;
|
||||
|
||||
volatile uint32_t token_secret_current;
|
||||
volatile uint32_t token_secret_previous;
|
||||
ks_time_t token_secret_expiration;
|
||||
@ -275,8 +310,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
|
||||
* Bind a local address and port for receiving UDP datagrams.
|
||||
* @param dht pointer to the dht instance
|
||||
* @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly
|
||||
* @param addr pointer to the remote address information
|
||||
* @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint
|
||||
* @param addr pointer to the local address information
|
||||
* @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output
|
||||
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
|
||||
* @see ks_socket_option
|
||||
* @see ks_addr_bind
|
||||
|
195
libs/libks/src/dht/ks_dht_search.c
Normal file
195
libs/libks/src/dht/ks_dht_search.c
Normal file
@ -0,0 +1,195 @@
|
||||
#include "ks_dht.h"
|
||||
#include "ks_dht-int.h"
|
||||
#include "sodium.h"
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool)
|
||||
{
|
||||
ks_dht_search_t *s;
|
||||
|
||||
ks_assert(search);
|
||||
ks_assert(pool);
|
||||
|
||||
*search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t));
|
||||
s->pool = pool;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool)
|
||||
{
|
||||
ks_assert(search);
|
||||
ks_assert(pool);
|
||||
|
||||
memset(search, 0, sizeof(ks_dht_search_t));
|
||||
|
||||
search->pool = pool;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search)
|
||||
{
|
||||
ks_status_t ret = KS_STATUS_SUCCESS;
|
||||
|
||||
ks_assert(search);
|
||||
ks_assert(*search);
|
||||
|
||||
if ((ret = ks_dht_search_deinit(*search)) != KS_STATUS_SUCCESS) return ret;
|
||||
if ((ret = ks_pool_free((*search)->pool, *search)) != KS_STATUS_SUCCESS) return ret;
|
||||
|
||||
*search = NULL;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback)
|
||||
{
|
||||
ks_status_t ret = KS_STATUS_SUCCESS;
|
||||
|
||||
ks_assert(search);
|
||||
ks_assert(search->pool);
|
||||
ks_assert(target);
|
||||
|
||||
if ((ret = ks_mutex_create(&search->mutex, KS_MUTEX_FLAG_DEFAULT, search->pool)) != KS_STATUS_SUCCESS) return ret;
|
||||
memcpy(search->target.id, target->id, KS_DHT_NODEID_SIZE);
|
||||
|
||||
if (callback) ks_dht_search_callback_add(search, callback);
|
||||
|
||||
if ((ret = ks_hash_create(&search->pending,
|
||||
KS_HASH_MODE_ARBITRARY,
|
||||
KS_HASH_FLAG_RWLOCK,
|
||||
search->pool)) != KS_STATUS_SUCCESS) return ret;
|
||||
ks_hash_set_keysize(search->pending, KS_DHT_NODEID_SIZE);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search)
|
||||
{
|
||||
ks_hash_iterator_t *it;
|
||||
ks_status_t ret = KS_STATUS_SUCCESS;
|
||||
|
||||
ks_assert(search);
|
||||
|
||||
search->results_length = 0;
|
||||
if (search->pending) {
|
||||
for (it = ks_hash_first(search->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
|
||||
const void *key;
|
||||
ks_dht_search_pending_t *val;
|
||||
|
||||
ks_hash_this(it, &key, NULL, (void **)&val);
|
||||
if ((ret = ks_dht_search_pending_deinit(val)) != KS_STATUS_SUCCESS) return ret;
|
||||
if ((ret = ks_dht_search_pending_free(&val)) != KS_STATUS_SUCCESS) return ret;
|
||||
}
|
||||
ks_hash_destroy(&search->pending);
|
||||
}
|
||||
search->callbacks_size = 0;
|
||||
if (search->callbacks) {
|
||||
if ((ret = ks_pool_free(search->pool, search->callbacks)) != KS_STATUS_SUCCESS) return ret;
|
||||
search->callbacks = NULL;
|
||||
}
|
||||
if (search->mutex && (ret = ks_mutex_destroy(&search->mutex)) != KS_STATUS_SUCCESS) return ret;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback)
|
||||
{
|
||||
ks_assert(search);
|
||||
|
||||
if (callback) {
|
||||
int32_t index = search->callbacks_size++;
|
||||
search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool,
|
||||
(void *)search->callbacks,
|
||||
sizeof(ks_dht_search_callback_t) * search->callbacks_size);
|
||||
search->callbacks[index] = callback;
|
||||
}
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool)
|
||||
{
|
||||
ks_dht_search_pending_t *p;
|
||||
|
||||
ks_assert(pending);
|
||||
ks_assert(pool);
|
||||
|
||||
*pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t));
|
||||
p->pool = pool;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool)
|
||||
{
|
||||
ks_assert(pending);
|
||||
ks_assert(pool);
|
||||
|
||||
memset(pending, 0, sizeof(ks_dht_search_pending_t));
|
||||
|
||||
pending->pool = pool;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending)
|
||||
{
|
||||
ks_status_t ret = KS_STATUS_SUCCESS;
|
||||
|
||||
ks_assert(pending);
|
||||
ks_assert(*pending);
|
||||
|
||||
if ((ret = ks_dht_search_pending_deinit(*pending)) != KS_STATUS_SUCCESS) return ret;
|
||||
if ((ret = ks_pool_free((*pending)->pool, *pending)) != KS_STATUS_SUCCESS) return ret;
|
||||
|
||||
*pending = NULL;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration)
|
||||
{
|
||||
ks_assert(pending);
|
||||
ks_assert(pending->pool);
|
||||
ks_assert(node);
|
||||
|
||||
pending->node = node;
|
||||
pending->expiration = expiration;
|
||||
pending->finished = KS_FALSE;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending)
|
||||
{
|
||||
ks_assert(pending);
|
||||
|
||||
pending->node = NULL;
|
||||
pending->expiration = 0;
|
||||
pending->finished = KS_FALSE;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/* For Emacs:
|
||||
* Local Variables:
|
||||
* mode:c
|
||||
* indent-tabs-mode:t
|
||||
* tab-width:4
|
||||
* c-basic-offset:4
|
||||
* End:
|
||||
* For VIM:
|
||||
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
|
||||
*/
|
@ -143,15 +143,28 @@ int main() {
|
||||
|
||||
diag("Ping test\n");
|
||||
|
||||
ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue ping from dht2 to dht1
|
||||
ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
|
||||
|
||||
ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1
|
||||
|
||||
ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
|
||||
|
||||
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
|
||||
|
||||
ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1
|
||||
|
||||
// Test blind find_node from dht3 to dht1 to find dht2 nodeid
|
||||
ok(ks_dhtrt_find_node(dht2.rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
|
||||
|
||||
diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
diag("DHT 1\n");
|
||||
ks_dht_pulse(dht1, 100);
|
||||
diag("DHT 2\n");
|
||||
ks_dht_pulse(&dht2, 100);
|
||||
}
|
||||
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
|
||||
|
||||
// Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
|
||||
|
||||
diag("Find_Node test\n");
|
||||
|
||||
@ -163,8 +176,17 @@ int main() {
|
||||
|
||||
ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
|
||||
|
||||
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL);
|
||||
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
|
||||
|
||||
diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
diag("DHT 1\n");
|
||||
ks_dht_pulse(dht1, 100);
|
||||
diag("DHT 2\n");
|
||||
ks_dht_pulse(&dht2, 100);
|
||||
}
|
||||
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
|
||||
|
||||
diag("Cleanup\n");
|
||||
/* Cleanup and shutdown */
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user