mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-05-20 14:34:27 +00:00
FS-9775: Search for both v4 and v6 should now be implemented, untested currently
This commit is contained in:
parent
c2868dbf8a
commit
767326b047
@ -267,6 +267,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
|
|||||||
KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search);
|
KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search);
|
||||||
|
|
||||||
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback);
|
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback);
|
||||||
|
KS_DECLARE(void) ks_dht_search_expire(ks_dht_search_t *search, ks_hash_t *pending, int32_t *active);
|
||||||
|
|
||||||
KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid);
|
KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid);
|
||||||
KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending);
|
KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending);
|
||||||
|
@ -83,13 +83,6 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
|
|||||||
ks_assert(d->registry_error);
|
ks_assert(d->registry_error);
|
||||||
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
|
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
|
||||||
|
|
||||||
/**
|
|
||||||
* Default these to FALSE, binding will set them TRUE when a respective address is bound.
|
|
||||||
* @todo these may not be useful anymore they are from legacy code
|
|
||||||
*/
|
|
||||||
d->bind_ipv4 = KS_FALSE;
|
|
||||||
d->bind_ipv6 = KS_FALSE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the data used to track endpoints to NULL, binding will handle latent allocations.
|
* Initialize the data used to track endpoints to NULL, binding will handle latent allocations.
|
||||||
* The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling.
|
* The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling.
|
||||||
@ -154,13 +147,16 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
|
|||||||
/**
|
/**
|
||||||
* Create the hash to store searches.
|
* Create the hash to store searches.
|
||||||
*/
|
*/
|
||||||
ks_hash_create(&d->search_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
|
ks_hash_create(&d->searches4_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
|
||||||
ks_assert(d->search_hash);
|
ks_assert(d->searches4_hash);
|
||||||
|
ks_hash_create(&d->searches6_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
|
||||||
|
ks_assert(d->searches6_hash);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The search hash uses arbitrary key size, which requires the key size be provided.
|
* The searches hash uses arbitrary key size, which requires the key size be provided.
|
||||||
*/
|
*/
|
||||||
ks_hash_set_keysize(d->search_hash, KS_DHT_NODEID_SIZE);
|
ks_hash_set_keysize(d->searches4_hash, KS_DHT_NODEID_SIZE);
|
||||||
|
ks_hash_set_keysize(d->searches6_hash, KS_DHT_NODEID_SIZE);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
|
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
|
||||||
@ -225,14 +221,23 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
|
|||||||
/**
|
/**
|
||||||
* Cleanup the search hash and it's contents if it is allocated.
|
* Cleanup the search hash and it's contents if it is allocated.
|
||||||
*/
|
*/
|
||||||
if (d->search_hash) {
|
if (d->searches6_hash) {
|
||||||
for (it = ks_hash_first(d->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
|
for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
|
||||||
ks_dht_search_t *val;
|
ks_dht_search_t *val;
|
||||||
|
|
||||||
ks_hash_this_val(it, (void **)&val);
|
ks_hash_this_val(it, (void **)&val);
|
||||||
ks_dht_search_destroy(&val);
|
ks_dht_search_destroy(&val);
|
||||||
}
|
}
|
||||||
ks_hash_destroy(&d->search_hash);
|
ks_hash_destroy(&d->searches6_hash);
|
||||||
|
}
|
||||||
|
if (d->searches4_hash) {
|
||||||
|
for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
|
||||||
|
ks_dht_search_t *val;
|
||||||
|
|
||||||
|
ks_hash_this_val(it, (void **)&val);
|
||||||
|
ks_dht_search_destroy(&val);
|
||||||
|
}
|
||||||
|
ks_hash_destroy(&d->searches4_hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -285,30 +290,18 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
|
|||||||
/**
|
/**
|
||||||
* Cleanup the array of endpoint pointers if it is allocated.
|
* Cleanup the array of endpoint pointers if it is allocated.
|
||||||
*/
|
*/
|
||||||
if (d->endpoints) {
|
if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
|
||||||
ks_pool_free(d->pool, &d->endpoints);
|
|
||||||
d->endpoints = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleanup the array of endpoint polling data if it is allocated.
|
* Cleanup the array of endpoint polling data if it is allocated.
|
||||||
*/
|
*/
|
||||||
if (d->endpoints_poll) {
|
if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
|
||||||
ks_pool_free(d->pool, &d->endpoints_poll);
|
|
||||||
d->endpoints_poll = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleanup the endpoints hash if it is allocated.
|
* Cleanup the endpoints hash if it is allocated.
|
||||||
*/
|
*/
|
||||||
if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
|
if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
|
||||||
|
|
||||||
/**
|
|
||||||
* Probably don't need this
|
|
||||||
*/
|
|
||||||
d->bind_ipv4 = KS_FALSE;
|
|
||||||
d->bind_ipv6 = KS_FALSE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleanup the type, query, and error registries if they have been allocated.
|
* Cleanup the type, query, and error registries if they have been allocated.
|
||||||
*/
|
*/
|
||||||
@ -474,12 +467,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
|
|||||||
return KS_STATUS_FAIL;
|
return KS_STATUS_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Legacy code, this can probably go away
|
|
||||||
*/
|
|
||||||
dht->bind_ipv4 |= addr->family == AF_INET;
|
|
||||||
dht->bind_ipv6 |= addr->family == AF_INET6;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attempt to open a UDP datagram socket for the given address family.
|
* Attempt to open a UDP datagram socket for the given address family.
|
||||||
*/
|
*/
|
||||||
@ -494,8 +481,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
|
|||||||
/**
|
/**
|
||||||
* Attempt to bind the socket to the desired local address.
|
* Attempt to bind the socket to the desired local address.
|
||||||
*/
|
*/
|
||||||
// @todo shouldn't ks_addr_bind take a const addr *?
|
if ((ret = ks_addr_bind(sock, addr)) != KS_STATUS_SUCCESS) goto done;
|
||||||
if ((ret = ks_addr_bind(sock, (ks_sockaddr_t *)addr)) != KS_STATUS_SUCCESS) goto done;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate the endpoint to track the local socket.
|
* Allocate the endpoint to track the local socket.
|
||||||
@ -616,6 +602,54 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
|
|||||||
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
|
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *searches)
|
||||||
|
{
|
||||||
|
ks_hash_iterator_t *it = NULL;
|
||||||
|
ks_time_t now = ks_time_now();
|
||||||
|
|
||||||
|
ks_assert(dht);
|
||||||
|
ks_assert(searches);
|
||||||
|
|
||||||
|
ks_hash_write_lock(searches);
|
||||||
|
for (it = ks_hash_first(searches, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
|
||||||
|
const void *key = NULL;
|
||||||
|
ks_dht_search_t *value = NULL;
|
||||||
|
int32_t active = 0;
|
||||||
|
|
||||||
|
ks_hash_this(it, &key, NULL, (void **)&value);
|
||||||
|
|
||||||
|
ks_mutex_lock(value->mutex);
|
||||||
|
for (ks_hash_iterator_t *i = ks_hash_first(value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
|
||||||
|
const void *k = NULL;
|
||||||
|
ks_dht_search_pending_t *v = NULL;
|
||||||
|
|
||||||
|
ks_hash_this(i, &k, NULL, (void **)&v);
|
||||||
|
|
||||||
|
if (v->finished) continue;
|
||||||
|
|
||||||
|
if (v->expiration <= now) {
|
||||||
|
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||||
|
char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||||
|
ks_log(KS_LOG_DEBUG,
|
||||||
|
"Search for %s pending find_node to %s has expired without response\n",
|
||||||
|
ks_dht_hexid(&value->target, id_buf),
|
||||||
|
ks_dht_hexid(&v->nodeid, id2_buf));
|
||||||
|
v->finished = KS_TRUE;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
active++;
|
||||||
|
}
|
||||||
|
ks_mutex_unlock(value->mutex);
|
||||||
|
|
||||||
|
if (active == 0) {
|
||||||
|
for (int32_t index = 0; index < value->callbacks_size; ++index) value->callbacks[index](dht, value);
|
||||||
|
ks_hash_remove(searches, (void *)key);
|
||||||
|
ks_dht_search_destroy(&value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ks_hash_write_unlock(searches);
|
||||||
|
}
|
||||||
|
|
||||||
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
|
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
|
||||||
{
|
{
|
||||||
ks_hash_iterator_t *it = NULL;
|
ks_hash_iterator_t *it = NULL;
|
||||||
@ -645,44 +679,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
|
|||||||
}
|
}
|
||||||
ks_hash_write_unlock(dht->transactions_hash);
|
ks_hash_write_unlock(dht->transactions_hash);
|
||||||
|
|
||||||
ks_hash_write_lock(dht->search_hash);
|
if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash);
|
||||||
for (it = ks_hash_first(dht->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
|
if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash);
|
||||||
const void *search_key = NULL;
|
|
||||||
ks_dht_search_t *search_value = NULL;
|
|
||||||
|
|
||||||
ks_hash_this(it, &search_key, NULL, (void **)&search_value);
|
|
||||||
|
|
||||||
ks_hash_write_lock(search_value->pending);
|
|
||||||
for (ks_hash_iterator_t *i = ks_hash_first(search_value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
|
|
||||||
const void *pending_key = NULL;
|
|
||||||
ks_dht_search_pending_t *pending_value = NULL;
|
|
||||||
ks_bool_t pending_remove = KS_FALSE;
|
|
||||||
|
|
||||||
ks_hash_this(i, &pending_key, NULL, (void **)&pending_value);
|
|
||||||
|
|
||||||
if (pending_value->finished) pending_remove = KS_TRUE;
|
|
||||||
else if (pending_value->expiration <= now) {
|
|
||||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
|
||||||
char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
|
||||||
ks_log(KS_LOG_DEBUG,
|
|
||||||
"Search for %s pending find_node to %s has expired without response\n",
|
|
||||||
ks_dht_hexid(&search_value->target, id_buf),
|
|
||||||
ks_dht_hexid(&pending_value->nodeid, id2_buf));
|
|
||||||
pending_remove = KS_TRUE;
|
|
||||||
}
|
|
||||||
if (pending_remove) {
|
|
||||||
ks_hash_remove(search_value->pending, (void *)pending_key);
|
|
||||||
ks_dht_search_pending_destroy(&pending_value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ks_hash_write_unlock(search_value->pending);
|
|
||||||
if (ks_hash_count(search_value->pending) == 0) {
|
|
||||||
for (int32_t index = 0; index < search_value->callbacks_size; ++index) search_value->callbacks[index](dht, search_value);
|
|
||||||
ks_hash_remove(dht->search_hash, (void *)search_key);
|
|
||||||
ks_dht_search_destroy(&search_value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ks_hash_write_unlock(dht->search_hash);
|
|
||||||
|
|
||||||
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
|
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
|
||||||
dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
|
dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
|
||||||
@ -771,7 +769,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *
|
|||||||
memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t));
|
memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t));
|
||||||
*buffer_length += addr_len;
|
*buffer_length += addr_len;
|
||||||
|
|
||||||
memcpy(buffer + (*buffer_length), (const void *)&port, sizeof(uint16_t));
|
memcpy(buffer + (*buffer_length), &port, sizeof(uint16_t));
|
||||||
*buffer_length += sizeof(uint16_t);
|
*buffer_length += sizeof(uint16_t);
|
||||||
|
|
||||||
return KS_STATUS_SUCCESS;
|
return KS_STATUS_SUCCESS;
|
||||||
@ -801,8 +799,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
|
|||||||
port = *((uint16_t *)(buffer + *buffer_length));
|
port = *((uint16_t *)(buffer + *buffer_length));
|
||||||
*buffer_length += sizeof(uint16_t);
|
*buffer_length += sizeof(uint16_t);
|
||||||
|
|
||||||
// @todo ks_addr_set_raw second parameter should be const?
|
return ks_addr_set_raw(address, paddr, port, address->family);
|
||||||
return ks_addr_set_raw(address, (void *)paddr, port, address->family);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
|
KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
|
||||||
@ -824,7 +821,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n
|
|||||||
return KS_STATUS_NO_MEM;
|
return KS_STATUS_NO_MEM;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE);
|
memcpy(buffer + (*buffer_length), nodeid->id, KS_DHT_NODEID_SIZE);
|
||||||
*buffer_length += KS_DHT_NODEID_SIZE;
|
*buffer_length += KS_DHT_NODEID_SIZE;
|
||||||
|
|
||||||
return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size);
|
return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size);
|
||||||
@ -953,7 +950,7 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k
|
|||||||
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
|
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
|
||||||
{
|
{
|
||||||
// @todo calculate max IPV6 payload size?
|
// @todo calculate max IPV6 payload size?
|
||||||
char buf[1000];
|
char buf[1001];
|
||||||
ks_size_t buf_len;
|
ks_size_t buf_len;
|
||||||
|
|
||||||
ks_assert(dht);
|
ks_assert(dht);
|
||||||
@ -963,8 +960,11 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
|
|||||||
|
|
||||||
// @todo blacklist check
|
// @todo blacklist check
|
||||||
|
|
||||||
// @todo use different encode function to check if all data was encoded, do not send large incomplete messages
|
|
||||||
buf_len = ben_encode2(buf, sizeof(buf), message->data);
|
buf_len = ben_encode2(buf, sizeof(buf), message->data);
|
||||||
|
if (buf_len >= sizeof(buf)) {
|
||||||
|
ks_log(KS_LOG_DEBUG, "Dropping message that is too large\n");
|
||||||
|
return KS_STATUS_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
|
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
|
||||||
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
|
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
|
||||||
@ -1192,13 +1192,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
|
|||||||
|
|
||||||
|
|
||||||
KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
|
KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
|
||||||
int family,
|
int32_t family,
|
||||||
ks_dht_nodeid_t *target,
|
ks_dht_nodeid_t *target,
|
||||||
ks_dht_search_callback_t callback,
|
ks_dht_search_callback_t callback,
|
||||||
ks_dht_search_t **search)
|
ks_dht_search_t **search)
|
||||||
{
|
{
|
||||||
|
ks_bool_t locked_searches = KS_FALSE;
|
||||||
ks_bool_t locked_search = KS_FALSE;
|
ks_bool_t locked_search = KS_FALSE;
|
||||||
ks_bool_t locked_pending = KS_FALSE;
|
ks_hash_t *searches = NULL;
|
||||||
|
ks_dhtrt_routetable_t *rt = NULL;
|
||||||
ks_dht_search_t *s = NULL;
|
ks_dht_search_t *s = NULL;
|
||||||
ks_bool_t inserted = KS_FALSE;
|
ks_bool_t inserted = KS_FALSE;
|
||||||
ks_bool_t allocated = KS_FALSE;
|
ks_bool_t allocated = KS_FALSE;
|
||||||
@ -1211,12 +1213,27 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
|
|||||||
|
|
||||||
if (search) *search = NULL;
|
if (search) *search = NULL;
|
||||||
|
|
||||||
// @todo start write lock on search_hash and hold until after inserting
|
if (family == AF_INET) {
|
||||||
// check hash for target to see if search already exists
|
if (!dht->rt_ipv4) {
|
||||||
ks_hash_write_lock(dht->search_hash);
|
ret = KS_STATUS_FAIL;
|
||||||
locked_search = KS_TRUE;
|
goto done;
|
||||||
|
}
|
||||||
|
searches = dht->searches4_hash;
|
||||||
|
rt = dht->rt_ipv4;
|
||||||
|
} else {
|
||||||
|
if (!dht->rt_ipv6) {
|
||||||
|
ret = KS_STATUS_FAIL;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
searches = dht->searches6_hash;
|
||||||
|
rt = dht->rt_ipv6;
|
||||||
|
}
|
||||||
|
|
||||||
s = ks_hash_search(dht->search_hash, target->id, KS_UNLOCKED);
|
// check hash for target to see if search already exists
|
||||||
|
ks_hash_write_lock(searches);
|
||||||
|
locked_searches = KS_TRUE;
|
||||||
|
|
||||||
|
s = ks_hash_search(searches, target->id, KS_UNLOCKED);
|
||||||
|
|
||||||
// if search does not exist, create new search and store in hash by target
|
// if search does not exist, create new search and store in hash by target
|
||||||
if (!s) {
|
if (!s) {
|
||||||
@ -1230,16 +1247,18 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
|
|||||||
// if the search is old then bail out and return successfully
|
// if the search is old then bail out and return successfully
|
||||||
if (!allocated) goto done;
|
if (!allocated) goto done;
|
||||||
|
|
||||||
if ((ret = ks_hash_insert(dht->search_hash, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
|
// everything past this point until final cleanup is only for when a search of the target does not already exist
|
||||||
|
|
||||||
|
if ((ret = ks_hash_insert(searches, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
|
||||||
inserted = KS_TRUE;
|
inserted = KS_TRUE;
|
||||||
|
|
||||||
// lock pending before unlocking the search hash to prevent this search from being used before we finish setting it up
|
// lock search before unlocking the searches_hash to prevent this search from being used before we finish setting it up
|
||||||
ks_hash_write_lock(s->pending);
|
ks_mutex_lock(s->mutex);
|
||||||
locked_pending = KS_TRUE;
|
locked_search = KS_TRUE;
|
||||||
|
|
||||||
// release search hash lock now, but pending is still locked
|
// release searches_hash lock now, but search is still locked
|
||||||
ks_hash_write_unlock(dht->search_hash);
|
ks_hash_write_unlock(searches);
|
||||||
locked_search = KS_FALSE;
|
locked_searches = KS_FALSE;
|
||||||
|
|
||||||
// find closest good nodes to target locally and store as the closest results
|
// find closest good nodes to target locally and store as the closest results
|
||||||
query.nodeid = *target;
|
query.nodeid = *target;
|
||||||
@ -1247,32 +1266,37 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
|
|||||||
query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE;
|
query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE;
|
||||||
query.family = family;
|
query.family = family;
|
||||||
query.count = 0;
|
query.count = 0;
|
||||||
ks_dhtrt_findclosest_nodes(family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6, &query);
|
ks_dhtrt_findclosest_nodes(rt, &query);
|
||||||
for (int32_t i = 0; i < query.count; ++i) {
|
for (int32_t i = 0; i < query.count; ++i) {
|
||||||
ks_dht_node_t *n = query.nodes[i];
|
ks_dht_node_t *n = query.nodes[i];
|
||||||
ks_dht_search_pending_t *pending = NULL;
|
ks_dht_search_pending_t *pending = NULL;
|
||||||
|
|
||||||
s->results[i] = n->nodeid;
|
// always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet
|
||||||
ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target);
|
s->results[s->results_length] = n->nodeid;
|
||||||
// add to pending with expiration
|
ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target);
|
||||||
|
s->results_length++;
|
||||||
|
|
||||||
|
pending = ks_hash_search(s->pending, n->nodeid.id, KS_UNLOCKED);
|
||||||
|
if (pending) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
|
||||||
|
|
||||||
|
// add to pending with expiration, if any of this fails it's almost catastrophic so just bail out and fail the entire search attempt
|
||||||
|
// there are no probable causes for a failure but check them anyway
|
||||||
if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
|
if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
|
||||||
if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
|
if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
|
||||||
ks_dht_search_pending_destroy(&pending);
|
ks_dht_search_pending_destroy(&pending);
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
|
if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
|
||||||
// increment here in case we end up bailing out; execute with what it has or destroy the search?
|
|
||||||
s->results_length++;
|
|
||||||
}
|
}
|
||||||
// @todo release query nodes
|
// @todo release closest local query node locks
|
||||||
ks_hash_write_unlock(s->pending);
|
ks_mutex_unlock(s->mutex);
|
||||||
locked_pending = KS_FALSE;
|
locked_search = KS_FALSE;
|
||||||
|
|
||||||
if (search) *search = s;
|
if (search) *search = s;
|
||||||
|
|
||||||
done:
|
done:
|
||||||
if (locked_search) ks_hash_write_unlock(dht->search_hash);
|
if (locked_searches) ks_hash_write_unlock(searches);
|
||||||
if (locked_pending) ks_hash_write_unlock(s->pending);
|
if (locked_search) ks_mutex_unlock(s->mutex);
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (!inserted && s) ks_dht_search_destroy(&s);
|
if (!inserted && s) ks_dht_search_destroy(&s);
|
||||||
*search = NULL;
|
*search = NULL;
|
||||||
@ -1513,7 +1537,13 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
|
|||||||
|
|
||||||
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
|
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
|
||||||
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
|
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
|
||||||
// @todo produce "want" value if both families are bound
|
// Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
|
||||||
|
if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, targetid->id, KS_DHT_NODEID_SIZE)) {
|
||||||
|
struct bencode *want = ben_list();
|
||||||
|
ben_list_append_str(want, "n4");
|
||||||
|
ben_list_append_str(want, "n6");
|
||||||
|
ben_dict_set(a, ben_blob("want", 4), want);
|
||||||
|
}
|
||||||
|
|
||||||
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
|
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
|
||||||
ks_q_push(dht->send_q, (void *)message);
|
ks_q_push(dht->send_q, (void *)message);
|
||||||
@ -1579,7 +1609,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
|
|||||||
query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
|
query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
|
||||||
if (want4) {
|
if (want4) {
|
||||||
query.family = AF_INET;
|
query.family = AF_INET;
|
||||||
ks_dhtrt_findclosest_nodes(routetable, &query);
|
ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query);
|
||||||
|
|
||||||
for (int32_t i = 0; i < query.count; ++i) {
|
for (int32_t i = 0; i < query.count; ++i) {
|
||||||
ks_dht_node_t *qn = query.nodes[i];
|
ks_dht_node_t *qn = query.nodes[i];
|
||||||
@ -1592,10 +1622,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
|
|||||||
|
|
||||||
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
|
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
|
||||||
}
|
}
|
||||||
|
// @todo release query nodes
|
||||||
}
|
}
|
||||||
if (want6) {
|
if (want6) {
|
||||||
query.family = AF_INET6;
|
query.family = AF_INET6;
|
||||||
ks_dhtrt_findclosest_nodes(routetable, &query);
|
ks_dhtrt_findclosest_nodes(dht->rt_ipv6, &query);
|
||||||
|
|
||||||
for (int32_t i = 0; i < query.count; ++i) {
|
for (int32_t i = 0; i < query.count; ++i) {
|
||||||
ks_dht_node_t *qn = query.nodes[i];
|
ks_dht_node_t *qn = query.nodes[i];
|
||||||
@ -1633,6 +1664,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
|||||||
{
|
{
|
||||||
ks_dht_nodeid_t *id;
|
ks_dht_nodeid_t *id;
|
||||||
struct bencode *n;
|
struct bencode *n;
|
||||||
|
//ks_bool_t n4 = KS_FALSE;
|
||||||
|
//ks_bool_t n6 = KS_FALSE;
|
||||||
const uint8_t *nodes = NULL;
|
const uint8_t *nodes = NULL;
|
||||||
const uint8_t *nodes6 = NULL;
|
const uint8_t *nodes6 = NULL;
|
||||||
size_t nodes_size = 0;
|
size_t nodes_size = 0;
|
||||||
@ -1642,6 +1675,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
|||||||
ks_dhtrt_routetable_t *routetable = NULL;
|
ks_dhtrt_routetable_t *routetable = NULL;
|
||||||
ks_dht_node_t *node = NULL;
|
ks_dht_node_t *node = NULL;
|
||||||
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||||
|
ks_hash_t *searches = NULL;
|
||||||
ks_dht_search_t *search = NULL;
|
ks_dht_search_t *search = NULL;
|
||||||
ks_status_t ret = KS_STATUS_SUCCESS;
|
ks_status_t ret = KS_STATUS_SUCCESS;
|
||||||
|
|
||||||
@ -1653,11 +1687,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
|||||||
|
|
||||||
n = ben_dict_get_by_str(message->args, "nodes");
|
n = ben_dict_get_by_str(message->args, "nodes");
|
||||||
if (n) {
|
if (n) {
|
||||||
|
//n4 = KS_TRUE;
|
||||||
nodes = (const uint8_t *)ben_str_val(n);
|
nodes = (const uint8_t *)ben_str_val(n);
|
||||||
nodes_size = ben_str_len(n);
|
nodes_size = ben_str_len(n);
|
||||||
}
|
}
|
||||||
n = ben_dict_get_by_str(message->args, "nodes6");
|
n = ben_dict_get_by_str(message->args, "nodes6");
|
||||||
if (n) {
|
if (n) {
|
||||||
|
//n6 = KS_TRUE;
|
||||||
nodes6 = (const uint8_t *)ben_str_val(n);
|
nodes6 = (const uint8_t *)ben_str_val(n);
|
||||||
nodes6_size = ben_str_len(n);
|
nodes6_size = ben_str_len(n);
|
||||||
}
|
}
|
||||||
@ -1671,14 +1707,18 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
|||||||
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
|
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
|
||||||
if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
|
if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
|
||||||
|
|
||||||
ks_hash_read_lock(dht->search_hash);
|
searches = message->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
|
||||||
search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_UNLOCKED);
|
|
||||||
ks_hash_read_unlock(dht->search_hash);
|
ks_hash_read_lock(searches);
|
||||||
|
search = ks_hash_search(searches, message->transaction->target.id, KS_UNLOCKED);
|
||||||
if (search) {
|
if (search) {
|
||||||
ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED);
|
ks_dht_search_pending_t *pending = NULL;
|
||||||
ks_hash_read_unlock(search->pending);
|
|
||||||
|
ks_mutex_lock(search->mutex);
|
||||||
|
pending = ks_hash_search(search->pending, id->id, KS_UNLOCKED);
|
||||||
if (pending) pending->finished = KS_TRUE;
|
if (pending) pending->finished = KS_TRUE;
|
||||||
}
|
}
|
||||||
|
ks_hash_read_unlock(searches);
|
||||||
|
|
||||||
while (nodes_len < nodes_size) {
|
while (nodes_len < nodes_size) {
|
||||||
ks_dht_nodeid_t nid;
|
ks_dht_nodeid_t nid;
|
||||||
@ -1697,7 +1737,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
|||||||
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
||||||
ks_dhtrt_release_node(node);
|
ks_dhtrt_release_node(node);
|
||||||
|
|
||||||
if (search) {
|
if (search && message->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
|
||||||
ks_dht_nodeid_t distance;
|
ks_dht_nodeid_t distance;
|
||||||
int32_t results_index = -1;
|
int32_t results_index = -1;
|
||||||
|
|
||||||
@ -1707,10 +1747,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
|||||||
search->results_length++;
|
search->results_length++;
|
||||||
} else {
|
} else {
|
||||||
for (int32_t index = 0; index < search->results_length; ++index) {
|
for (int32_t index = 0; index < search->results_length; ++index) {
|
||||||
// Check if new node is closer than this existing result
|
// Check if new node is closer than this previous result
|
||||||
if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
|
if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
|
||||||
// If this is the first node that is further then keep it
|
// If this is the first node that is further then keep it
|
||||||
// Else if two or more nodes are further, and this existing result is further than the previous one then keep it
|
// Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
|
||||||
if (results_index < 0) results_index = index;
|
if (results_index < 0) results_index = index;
|
||||||
else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
|
else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
|
||||||
}
|
}
|
||||||
@ -1757,12 +1797,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
|
|||||||
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
|
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
|
||||||
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
|
||||||
ks_dhtrt_release_node(node);
|
ks_dhtrt_release_node(node);
|
||||||
|
|
||||||
|
if (search && message->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
|
||||||
|
ks_dht_nodeid_t distance;
|
||||||
|
int32_t results_index = -1;
|
||||||
|
|
||||||
|
ks_dht_utility_nodeid_xor(&distance, &nid, &search->target);
|
||||||
|
if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
|
||||||
|
results_index = search->results_length;
|
||||||
|
search->results_length++;
|
||||||
|
} else {
|
||||||
|
for (int32_t index = 0; index < search->results_length; ++index) {
|
||||||
|
// Check if new node is closer than this previous result
|
||||||
|
if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
|
||||||
|
// If this is the first node that is further then keep it
|
||||||
|
// Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
|
||||||
|
if (results_index < 0) results_index = index;
|
||||||
|
else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (results_index >= 0) {
|
||||||
|
char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||||
|
char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
|
||||||
|
ks_dht_search_pending_t *pending = NULL;
|
||||||
|
|
||||||
|
ks_log(KS_LOG_DEBUG,
|
||||||
|
"Set closer node id %s (%s) in search of target id %s at results index %d\n",
|
||||||
|
ks_dht_hexid(&nid, id_buf),
|
||||||
|
ks_dht_hexid(&distance, id2_buf),
|
||||||
|
ks_dht_hexid(&search->target, id3_buf),
|
||||||
|
results_index);
|
||||||
|
search->results[results_index] = nid;
|
||||||
|
search->distances[results_index] = distance;
|
||||||
|
|
||||||
|
if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
|
||||||
|
if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
|
||||||
|
ks_dht_search_pending_destroy(&pending);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// @todo repeat above for ipv6 table
|
|
||||||
|
|
||||||
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
|
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
|
||||||
|
|
||||||
done:
|
done:
|
||||||
|
if(search) ks_mutex_unlock(search->mutex);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,17 +149,6 @@ struct ks_dht_transaction_s {
|
|||||||
ks_bool_t finished;
|
ks_bool_t finished;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check if search already exists for the target id, if so add another callback, must be a popular target id
|
|
||||||
// Otherwise create new search, set target id, add callback, and insert the search into the dht search_hash with target id key
|
|
||||||
// Get closest local nodes to target id, check against results, send_findnode for closer nodes and add to pending hash with queried node id
|
|
||||||
// Upon receiving find_node response, check target id against dht search_hash, check responding node id against pending hash, set finished for purging
|
|
||||||
// Update results if responding node id is closer than any current result, or the results are not full
|
|
||||||
// Check response nodes against results, send_findnode for closer nodes and add to pending hash with an expiration
|
|
||||||
// Pulse expirations purges expired and finished from pending hash, once hash is empty callbacks are called providing results array
|
|
||||||
// Note:
|
|
||||||
// During the lifetime of a search, the ks_dht_node_t's must be kept alive
|
|
||||||
// Do a query touch on nodes prior to being added to pending, this should reset timeout and keep the nodes alive long enough even if they are dubious
|
|
||||||
// Nodes which land in results are known good with recent response to find_nodes and should be around for a while before route table worries about cleanup
|
|
||||||
struct ks_dht_search_s {
|
struct ks_dht_search_s {
|
||||||
ks_pool_t *pool;
|
ks_pool_t *pool;
|
||||||
ks_mutex_t *mutex;
|
ks_mutex_t *mutex;
|
||||||
@ -208,9 +197,6 @@ struct ks_dht_s {
|
|||||||
ks_hash_t *registry_query;
|
ks_hash_t *registry_query;
|
||||||
ks_hash_t *registry_error;
|
ks_hash_t *registry_error;
|
||||||
|
|
||||||
ks_bool_t bind_ipv4;
|
|
||||||
ks_bool_t bind_ipv6;
|
|
||||||
|
|
||||||
ks_dht_endpoint_t **endpoints;
|
ks_dht_endpoint_t **endpoints;
|
||||||
int32_t endpoints_size;
|
int32_t endpoints_size;
|
||||||
ks_hash_t *endpoints_hash;
|
ks_hash_t *endpoints_hash;
|
||||||
@ -230,7 +216,8 @@ struct ks_dht_s {
|
|||||||
ks_dhtrt_routetable_t *rt_ipv4;
|
ks_dhtrt_routetable_t *rt_ipv4;
|
||||||
ks_dhtrt_routetable_t *rt_ipv6;
|
ks_dhtrt_routetable_t *rt_ipv6;
|
||||||
|
|
||||||
ks_hash_t *search_hash;
|
ks_hash_t *searches4_hash;
|
||||||
|
ks_hash_t *searches6_hash;
|
||||||
|
|
||||||
volatile uint32_t token_secret_current;
|
volatile uint32_t token_secret_current;
|
||||||
volatile uint32_t token_secret_previous;
|
volatile uint32_t token_secret_previous;
|
||||||
@ -300,7 +287,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
|
|||||||
* @param dht pointer to the dht instance
|
* @param dht pointer to the dht instance
|
||||||
* @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly
|
* @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly
|
||||||
* @param addr pointer to the local address information
|
* @param addr pointer to the local address information
|
||||||
* @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output
|
* @param endpoint dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output
|
||||||
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
|
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
|
||||||
* @see ks_socket_option
|
* @see ks_socket_option
|
||||||
* @see ks_addr_bind
|
* @see ks_addr_bind
|
||||||
@ -320,6 +307,25 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
|
|||||||
*/
|
*/
|
||||||
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
|
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a network search of the closest nodes to a target.
|
||||||
|
* @param dht pointer to the dht instance
|
||||||
|
* @param family either AF_INET or AF_INET6 for the appropriate network to search
|
||||||
|
* @param target pointer to the nodeid for the target to be searched
|
||||||
|
* @param callback an optional callback to add to the search when it is finished
|
||||||
|
* @param search dereferenced out pointer to the allocated search, may be NULL to ignore search output
|
||||||
|
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
|
||||||
|
* @see ks_dht_search_create
|
||||||
|
* @see ks_dht_search_callback_add
|
||||||
|
* @see ks_hash_insert
|
||||||
|
* @see ks_dht_search_pending_create
|
||||||
|
* @see ks_dht_send_findnode
|
||||||
|
*/
|
||||||
|
KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
|
||||||
|
int32_t family,
|
||||||
|
ks_dht_nodeid_t *target,
|
||||||
|
ks_dht_search_callback_t callback,
|
||||||
|
ks_dht_search_t **search);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -31,8 +31,7 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (dg) ks_dht_datagram_destroy(&dg);
|
ks_dht_datagram_destroy(datagram);
|
||||||
*datagram = NULL;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -46,9 +45,7 @@ KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram)
|
|||||||
|
|
||||||
dg = *datagram;
|
dg = *datagram;
|
||||||
|
|
||||||
ks_pool_free(dg->pool, &dg);
|
ks_pool_free(dg->pool, datagram);
|
||||||
|
|
||||||
*datagram = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For Emacs:
|
/* For Emacs:
|
||||||
|
@ -30,8 +30,7 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (ep) ks_dht_endpoint_destroy(&ep);
|
if (ep) ks_dht_endpoint_destroy(endpoint);
|
||||||
*endpoint = NULL;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -49,9 +48,8 @@ KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint)
|
|||||||
ep = *endpoint;
|
ep = *endpoint;
|
||||||
|
|
||||||
if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock);
|
if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock);
|
||||||
ks_pool_free(ep->pool, &ep);
|
|
||||||
|
|
||||||
*endpoint = NULL;
|
ks_pool_free(ep->pool, endpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For Emacs:
|
/* For Emacs:
|
||||||
|
@ -26,8 +26,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (m) ks_dht_message_destroy(&m);
|
ks_dht_message_destroy(message);
|
||||||
*message = NULL;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -45,9 +44,8 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message)
|
|||||||
ben_free(m->data);
|
ben_free(m->data);
|
||||||
m->data = NULL;
|
m->data = NULL;
|
||||||
}
|
}
|
||||||
ks_pool_free(m->pool, &(*message));
|
|
||||||
|
|
||||||
*message = NULL;
|
ks_pool_free(m->pool, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -27,8 +27,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (s) ks_dht_search_destroy(&s);
|
if (s) ks_dht_search_destroy(search);
|
||||||
*search = NULL;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -58,9 +57,7 @@ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
|
|||||||
}
|
}
|
||||||
if (s->mutex) ks_mutex_destroy(&s->mutex);
|
if (s->mutex) ks_mutex_destroy(&s->mutex);
|
||||||
|
|
||||||
ks_pool_free(s->pool, &s);
|
ks_pool_free(s->pool, search);
|
||||||
|
|
||||||
*search = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback)
|
KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback)
|
||||||
@ -100,10 +97,9 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (p) ks_dht_search_pending_destroy(&p);
|
if (p) ks_dht_search_pending_destroy(pending);
|
||||||
*pending = NULL;
|
|
||||||
}
|
}
|
||||||
return KS_STATUS_SUCCESS;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending)
|
KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending)
|
||||||
@ -115,9 +111,7 @@ KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending
|
|||||||
|
|
||||||
p = *pending;
|
p = *pending;
|
||||||
|
|
||||||
ks_pool_free(p->pool, &p);
|
ks_pool_free(p->pool, pending);
|
||||||
|
|
||||||
*pending = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For Emacs:
|
/* For Emacs:
|
||||||
|
@ -32,8 +32,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (si) ks_dht_storageitem_destroy(&si);
|
if (si) ks_dht_storageitem_destroy(item);
|
||||||
*item = NULL;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -83,8 +82,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (si) ks_dht_storageitem_destroy(&si);
|
if (si) ks_dht_storageitem_destroy(item);
|
||||||
*item = NULL;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -105,9 +103,8 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item)
|
|||||||
ben_free(si->v);
|
ben_free(si->v);
|
||||||
si->v = NULL;
|
si->v = NULL;
|
||||||
}
|
}
|
||||||
ks_pool_free(si->pool, &si);
|
|
||||||
|
|
||||||
*item = NULL;
|
ks_pool_free(si->pool, item);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For Emacs:
|
/* For Emacs:
|
||||||
|
@ -25,8 +25,7 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac
|
|||||||
|
|
||||||
// done:
|
// done:
|
||||||
if (ret != KS_STATUS_SUCCESS) {
|
if (ret != KS_STATUS_SUCCESS) {
|
||||||
if (t) ks_dht_transaction_destroy(&t);
|
if (t) ks_dht_transaction_destroy(transaction);
|
||||||
*transaction = NULL;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -40,9 +39,7 @@ KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction)
|
|||||||
|
|
||||||
t = *transaction;
|
t = *transaction;
|
||||||
|
|
||||||
ks_pool_free(t->pool, &t);
|
ks_pool_free(t->pool, transaction);
|
||||||
|
|
||||||
*transaction = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For Emacs:
|
/* For Emacs:
|
||||||
|
@ -149,9 +149,9 @@ int main() {
|
|||||||
|
|
||||||
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
|
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
diag("DHT 1\n");
|
//diag("DHT 1\n");
|
||||||
ks_dht_pulse(dht1, 100);
|
ks_dht_pulse(dht1, 100);
|
||||||
diag("DHT 2\n");
|
//diag("DHT 2\n");
|
||||||
ks_dht_pulse(dht2, 100);
|
ks_dht_pulse(dht2, 100);
|
||||||
}
|
}
|
||||||
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
|
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
|
||||||
@ -174,9 +174,9 @@ int main() {
|
|||||||
|
|
||||||
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
|
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
diag("DHT 1\n");
|
//diag("DHT 1\n");
|
||||||
ks_dht_pulse(dht1, 100);
|
ks_dht_pulse(dht1, 100);
|
||||||
diag("DHT 2\n");
|
//diag("DHT 2\n");
|
||||||
ks_dht_pulse(dht2, 100);
|
ks_dht_pulse(dht2, 100);
|
||||||
}
|
}
|
||||||
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
|
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
|
||||||
|
Loading…
x
Reference in New Issue
Block a user