FS-9775: First round of integration of DHT into libblade, requires ongoing changes to DHT for proper exposure to blade level

This commit is contained in:
Shane Bryldt 2017-01-09 13:19:14 +00:00 committed by Mike Jerris
parent 42bfcc3d46
commit 743a768a3f
17 changed files with 991 additions and 4864 deletions

View File

@ -33,14 +33,22 @@
#include "blade.h"
#define KS_DHT_TPOOL_MIN 2
#define KS_DHT_TPOOL_MAX 8
#define KS_DHT_TPOOL_STACK (1024 * 256)
#define KS_DHT_TPOOL_IDLE 10
typedef enum {
BP_NONE = 0,
BP_MYPOOL = (1 << 0)
BP_MYPOOL = (1 << 0),
BP_MYTPOOL = (1 << 1)
} bppvt_flag_t;
struct blade_peer_s {
bppvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
ks_dht_t *dht;
};
@ -60,33 +68,75 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
flags = bp->flags;
pool = bp->pool;
if (bp->dht) ks_dht_destroy(&bp->dht);
if (bp->tpool && (flags & BP_MYTPOOL)) ks_thread_pool_destroy(&bp->tpool);
ks_pool_free(bp->pool, &bp);
if (pool && (flags & BP_MYPOOL)) {
ks_pool_close(&pool);
}
if (pool && (flags & BP_MYPOOL)) ks_pool_close(&pool);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool)
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid)
{
bppvt_flag_t newflags = BP_NONE;
blade_peer_t *bp = NULL;
ks_dht_t *dht = NULL;
if (!pool) {
newflags |= BP_MYPOOL;
ks_pool_open(&pool);
ks_assert(pool);
}
if (!tpool) {
newflags |= BP_MYTPOOL;
ks_thread_pool_create(&tpool, BLADE_PEER_TPOOL_MIN, BLADE_PEER_TPOOL_MAX, BLADE_PEER_TPOOL_STACK, KS_PRI_NORMAL, BLADE_PEER_TPOOL_IDLE);
ks_assert(tpool);
}
ks_dht_create(&dht, pool, tpool, nodeid);
ks_assert(dht);
bp = ks_pool_alloc(pool, sizeof(*bp));
bp->pool = pool;
bp->flags = newflags;
bp->pool = pool;
bp->tpool = tpool;
bp->dht = dht;
*bpP = bp;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp)
{
ks_assert(bp);
ks_assert(bp->dht);
return &bp->dht->nodeid;
}
KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port)
{
ks_assert(bp);
ks_dht_autoroute(bp->dht, autoroute, port);
}
KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
{
ks_assert(bp);
ks_assert(addr);
return ks_dht_bind(bp->dht, addr, endpoint);
}
KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout)
{
ks_assert(bp);
ks_assert(timeout >= 0);
ks_dht_pulse(bp->dht, timeout);
}
/* For Emacs:
* Local Variables:

View File

@ -35,12 +35,14 @@
typedef enum {
BH_NONE = 0,
BH_MYPOOL = (1 << 0)
BH_MYPOOL = (1 << 0),
BH_MYTPOOL = (1 << 1)
} bhpvt_flag_t;
struct blade_handle_s {
ks_pool_t *pool;
bhpvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
blade_peer_t *peer;
};
@ -62,6 +64,7 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
pool = bh->pool;
blade_peer_destroy(&bh->peer);
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, &bh);
@ -72,26 +75,78 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool)
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid)
{
bhpvt_flag_t newflags = BH_NONE;
blade_handle_t *bh = NULL;
ks_dht_nodeid_t nid;
ks_assert(nodeid);
ks_assert(strlen(nodeid) == (KS_DHT_NODEID_SIZE * 2));
if (!pool) {
newflags |= BH_MYPOOL;
ks_pool_open(&pool);
}
if (!tpool) {
newflags |= BH_MYTPOOL;
ks_thread_pool_create(&tpool, BLADE_HANDLE_TPOOL_MIN, BLADE_HANDLE_TPOOL_MAX, BLADE_HANDLE_TPOOL_STACK, KS_PRI_NORMAL, BLADE_HANDLE_TPOOL_IDLE);
ks_assert(tpool);
}
bh = ks_pool_alloc(pool, sizeof(*bh));
bh->pool = pool;
bh->flags = newflags;
blade_peer_create(&bh->peer, bh->pool);
bh->pool = pool;
bh->tpool = tpool;
ks_dht_dehex(nid.id, nodeid, KS_DHT_NODEID_SIZE);
blade_peer_create(&bh->peer, bh->pool, bh->tpool, &nid);
*bhP = bh;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer)
{
ks_dht_nodeid_t *nodeid = NULL;
ks_assert(bh);
ks_assert(bh->peer);
nodeid = blade_peer_myid(bh->peer);
ks_dht_hex(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
}
KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port)
{
ks_assert(bh);
ks_assert(bh->peer);
blade_peer_autoroute(bh->peer, autoroute, port);
}
KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint)
{
ks_sockaddr_t addr;
int family = AF_INET;
ks_assert(bh);
ks_assert(ip);
ks_assert(port);
if (ip[1] != '.' && ip[2] != '.' && ip[3] != '.') family = AF_INET6;
ks_addr_set(&addr, ip, port, family);
return blade_peer_bind(bh->peer, &addr, endpoint);
}
KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout)
{
ks_assert(bh);
ks_assert(timeout >= 0);
blade_peer_pulse(bh->peer, timeout);
}
/* For Emacs:

View File

@ -34,6 +34,7 @@
#ifndef _BLADE_H_
#define _BLADE_H_
#include <ks.h>
#include <ks_dht.h>
#include <sodium.h>
#include "blade_types.h"
#include "blade_stack.h"

View File

@ -35,9 +35,18 @@
#define _BPCP_H_
#include <blade.h>
#define BLADE_PEER_TPOOL_MIN 2
#define BLADE_PEER_TPOOL_MAX 8
#define BLADE_PEER_TPOOL_STACK (1024 * 256)
#define BLADE_PEER_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool);
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid);
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp);
KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout);
KS_END_EXTERN_C
#endif

View File

@ -35,9 +35,18 @@
#define _BLADE_STACK_H_
#include <blade.h>
#define BLADE_HANDLE_TPOOL_MIN 2
#define BLADE_HANDLE_TPOOL_MAX 8
#define BLADE_HANDLE_TPOOL_STACK (1024 * 256)
#define BLADE_HANDLE_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP);
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool);
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid);
KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer);
KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout);
KS_END_EXTERN_C
#endif

View File

@ -7,6 +7,11 @@ testbuild_SOURCES = testbuild.c tap.c
testbuild_CFLAGS = $(AM_CFLAGS)
testbuild_LDADD = $(TEST_LDADD)
check_PROGRAMS += bladec
bladec_SOURCES = bladec.c tap.c
bladec_CFLAGS = $(AM_CFLAGS)
bladec_LDADD = $(TEST_LDADD)
TESTS=$(check_PROGRAMS)

204
libs/libblade/test/bladec.c Normal file
View File

@ -0,0 +1,204 @@
#include "blade.h"
#include "tap.h"
#ifdef _WIN32
#define STDIO_FD(_fs) _fileno(_fs)
#define READ(_fd, _buffer, _count) _read(_fd, _buffer, _count)
#else
#define STDIO_FD(_fs) fileno(_fs)
#define READ(_fd, _buffer, _count) read(_fd, _buffer, _count)
#endif
#define CONSOLE_INPUT_MAX 512
ks_bool_t g_shutdown = KS_FALSE;
char g_console_input[CONSOLE_INPUT_MAX];
size_t g_console_input_length = 0;
size_t g_console_input_eol = 0;
void loop(blade_handle_t *bh);
void process_console_input(blade_handle_t *bh, char *line);
typedef void (*command_callback)(blade_handle_t *bh, char *args);
struct command_def_s {
const char *cmd;
command_callback callback;
};
void command_test(blade_handle_t *bh, char *args);
void command_quit(blade_handle_t *bh, char *args);
void command_myid(blade_handle_t *bh, char *args);
void command_bind(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "test", command_test },
{ "quit", command_quit },
{ "myid", command_myid },
{ "bind", command_bind },
{ NULL, NULL }
};
int main(int argc, char **argv)
{
blade_handle_t *bh = NULL;
const char *nodeid;
ks_assert(argc >= 2);
nodeid = argv[1];
ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
blade_init();
blade_handle_create(&bh, NULL, NULL, nodeid);
blade_handle_autoroute(bh, KS_TRUE, KS_DHT_DEFAULT_PORT);
loop(bh);
blade_handle_destroy(&bh);
blade_shutdown();
return 0;
}
void buffer_console_input(void)
{
ssize_t bytes = 0;
struct pollfd poll[1];
poll[0].fd = STDIO_FD(stdin);
poll[0].events = POLLIN | POLLERR;
if (ks_poll(poll, 1, 1) > 0) {
if (poll[0].revents & POLLIN) {
if ((bytes = READ(poll[0].fd, g_console_input + g_console_input_length, CONSOLE_INPUT_MAX - g_console_input_length)) <= 0) {
// @todo error
return;
}
g_console_input_length += bytes;
}
}
}
void loop(blade_handle_t *bh)
{
while (!g_shutdown) {
ks_bool_t eol = KS_FALSE;
buffer_console_input();
for (; g_console_input_eol < g_console_input_length; ++g_console_input_eol) {
char c = g_console_input[g_console_input_eol];
if (c == '\r' || c == '\n') {
eol = KS_TRUE;
break;
}
}
if (eol) {
g_console_input[g_console_input_eol] = '\0';
process_console_input(bh, g_console_input);
g_console_input_eol++;
for (; g_console_input_eol < g_console_input_length; ++g_console_input_eol) {
char c = g_console_input[g_console_input_eol];
if (c != '\r' && c != '\n') break;
}
if (g_console_input_eol == g_console_input_length) g_console_input_eol = g_console_input_length = 0;
else {
memcpy(g_console_input, g_console_input + g_console_input_eol, g_console_input_length - g_console_input_eol);
g_console_input_length -= g_console_input_eol;
g_console_input_eol = 0;
}
}
if (g_console_input_length == CONSOLE_INPUT_MAX) {
// @todo lines must not exceed 512 bytes, treat as error and ignore buffer until next new line?
ks_assert(0);
}
blade_handle_pulse(bh, 1);
}
}
void parse_argument(char **input, char **arg, char terminator)
{
char *tmp;
ks_assert(input);
ks_assert(*input);
ks_assert(arg);
tmp = *input;
*arg = tmp;
while (*tmp && *tmp != terminator) ++tmp;
if (*tmp == terminator) {
*tmp = '\0';
++tmp;
}
*input = tmp;
}
void process_console_input(blade_handle_t *bh, char *line)
{
char *args = line;
char *cmd = NULL;
ks_bool_t found = KS_FALSE;
ks_log(KS_LOG_DEBUG, "Output: %s\n", line);
parse_argument(&args, &cmd, ' ');
ks_log(KS_LOG_DEBUG, "Command: %s, Args: %s\n", cmd, args);
for (int32_t index = 0; command_defs[index].cmd; ++index) {
if (!strcmp(command_defs[index].cmd, cmd)) {
found = KS_TRUE;
command_defs[index].callback(bh, args);
}
}
if (!found) ks_log(KS_LOG_INFO, "Command '%s' unknown.\n", cmd);
}
void command_test(blade_handle_t *bh, char *args)
{
ks_log(KS_LOG_DEBUG, "Hello World!\n");
}
void command_quit(blade_handle_t *bh, char *args)
{
ks_assert(bh);
ks_assert(args);
ks_log(KS_LOG_DEBUG, "Shutting down\n");
g_shutdown = KS_TRUE;
}
void command_myid(blade_handle_t *bh, char *args)
{
char buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_assert(bh);
ks_assert(args);
blade_handle_myid(bh, buf);
ks_log(KS_LOG_INFO, "%s\n", buf);
}
void command_bind(blade_handle_t *bh, char *args)
{
char *ip = NULL;
char *port = NULL;
ks_port_t p;
ks_assert(args);
parse_argument(&args, &ip, ' ');
parse_argument(&args, &port, ' ');
p = atoi(port); // @todo use strtol for error handling
blade_handle_bind(bh, ip, p, NULL);
}

View File

@ -11,7 +11,7 @@ int main(void)
plan(1);
status = blade_handle_create(&bh, NULL);
status = blade_handle_create(&bh, NULL, NULL);
status = blade_handle_destroy(&bh);
ok(status == KS_STATUS_SUCCESS);

View File

@ -8,13 +8,14 @@ void ks_dht_transaction_destructor(void *ptr) { ks_dht_transaction_destroy((ks_d
void ks_dht_storageitem_destructor(void *ptr) { ks_dht_storageitem_destroy((ks_dht_storageitem_t **)&ptr); }
KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool)
KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid)
{
ks_bool_t pool_alloc = !pool;
ks_dht_t *d = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(nodeid);
*dht = NULL;
@ -49,6 +50,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
ks_assert(d->tpool);
}
d->nodeid = *nodeid;
/**
* Default autorouting to disabled.
*/
@ -366,7 +369,7 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_
if (!ep && dht->autoroute) {
ks_sockaddr_t addr;
if ((ret = ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_dht_bind(dht, NULL, &addr, &ep)) != KS_STATUS_SUCCESS) return ret;
if ((ret = ks_dht_bind(dht, &addr, &ep)) != KS_STATUS_SUCCESS) return ret;
}
/**
@ -419,7 +422,7 @@ KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_
}
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
{
ks_socket_t sock = KS_SOCK_INVALID;
ks_dht_endpoint_t *ep = NULL;
@ -466,7 +469,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
/**
* Allocate the endpoint to track the local socket.
*/
ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock);
ks_dht_endpoint_create(&ep, dht->pool, addr, sock);
ks_assert(ep);
/**
@ -506,24 +509,24 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
if (ep->addr.family == AF_INET) {
if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv4,
ep->nodeid,
dht->nodeid,
KS_DHT_LOCAL,
ep->addr.host,
ep->addr.port,
KS_DHTRT_CREATE_DEFAULT,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
&dht->node)) != KS_STATUS_SUCCESS) goto done;
} else {
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
ep->nodeid,
dht->nodeid,
KS_DHT_LOCAL,
ep->addr.host,
ep->addr.port,
KS_DHTRT_CREATE_DEFAULT,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
&dht->node)) != KS_STATUS_SUCCESS) goto done;
}
/**
* Do not release the ep->node, keep it alive until cleanup
* Do not release the dht->node, keep it alive until cleanup
*/
/**
@ -788,6 +791,19 @@ KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len)
return buffer;
}
KS_DECLARE(uint8_t *) ks_dht_dehex(uint8_t *data, const char *buffer, ks_size_t len)
{
const char *t = buffer;
ks_assert(data);
ks_assert(buffer);
ks_assert(!(len & 1));
for (int i = 0; i < len; ++i, t += 2) sscanf(t, "%2hhx", &data[i]);
return data;
}
KS_DECLARE(void) ks_dht_utility_nodeid_xor(ks_dht_nodeid_t *dest, ks_dht_nodeid_t *src1, ks_dht_nodeid_t *src2)
{
ks_assert(dest);
@ -1361,7 +1377,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
if (args) *args = a;
ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_SIZE));
*message = msg;
@ -1414,7 +1430,7 @@ KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
if (args) *args = r;
ben_dict_set(r, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_SIZE));
*message = msg;
@ -1546,7 +1562,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
message->args_id = *id;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
if ((ret = ks_dhtrt_create_node(message->endpoint->addr.family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6,
*id,
KS_DHT_REMOTE,
message->raddr.host,
@ -1620,7 +1636,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
message->args_id = *id;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
if ((ret = ks_dhtrt_create_node(message->endpoint->addr.family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6,
*id,
KS_DHT_REMOTE,
message->raddr.host,
@ -1629,7 +1645,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
&node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_touch_node(message->endpoint->addr.family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6, *id)) != KS_STATUS_SUCCESS) goto done;
tid = (uint32_t *)message->transactionid;
@ -2292,7 +2308,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
// Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->query_target.id, KS_DHT_NODEID_SIZE)) {
if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(dht->nodeid.id, job->query_target.id, KS_DHT_NODEID_SIZE)) {
struct bencode *want = ben_list();
ben_list_append_str(want, "n4");
ben_list_append_str(want, "n6");

View File

@ -1,593 +0,0 @@
#ifndef KS_DHT_H
#define KS_DHT_H
#include "ks.h"
#include "ks_bencode.h"
#include "sodium.h"
KS_BEGIN_EXTERN_C
#define KS_DHT_DEFAULT_PORT 5309
#define KS_DHT_TPOOL_MIN 2
#define KS_DHT_TPOOL_MAX 8
#define KS_DHT_TPOOL_STACK (1024 * 256)
#define KS_DHT_TPOOL_IDLE 10
#define KS_DHT_DATAGRAM_BUFFER_SIZE 1000
//#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
#define KS_DHT_NODEID_SIZE 20
#define KS_DHT_RESPONSE_NODES_MAX_SIZE 8
#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
#define KS_DHT_TRANSACTION_EXPIRATION 10
#define KS_DHT_TRANSACTIONS_PULSE 1
#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
#define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES
#define KS_DHT_STORAGEITEM_SKEY_SIZE crypto_sign_SECRETKEYBYTES
#define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
#define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
#define KS_DHT_STORAGEITEM_EXPIRATION 7200
#define KS_DHT_STORAGEITEM_KEEPALIVE 300
#define KS_DHT_STORAGEITEMS_PULSE 10
#define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
#define KS_DHT_TOKEN_EXPIRATION 300
#define KS_DHT_TOKENS_PULSE 1
#define KS_DHTRT_MAXQUERYSIZE 20
typedef struct ks_dht_s ks_dht_t;
typedef struct ks_dht_datagram_s ks_dht_datagram_t;
typedef struct ks_dht_job_s ks_dht_job_t;
typedef struct ks_dht_nodeid_s ks_dht_nodeid_t;
typedef struct ks_dht_token_s ks_dht_token_t;
typedef struct ks_dht_storageitem_pkey_s ks_dht_storageitem_pkey_t;
typedef struct ks_dht_storageitem_skey_s ks_dht_storageitem_skey_t;
typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t;
typedef struct ks_dht_message_s ks_dht_message_t;
typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
typedef struct ks_dht_transaction_s ks_dht_transaction_t;
typedef struct ks_dht_search_s ks_dht_search_t;
typedef struct ks_dht_publish_s ks_dht_publish_t;
typedef struct ks_dht_distribute_s ks_dht_distribute_t;
typedef struct ks_dht_node_s ks_dht_node_t;
typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t;
typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job);
typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
//typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
typedef ks_status_t (*ks_dht_storageitem_callback_t)(ks_dht_t *dht, ks_dht_storageitem_t *item);
struct ks_dht_datagram_s {
ks_pool_t *pool;
ks_dht_t *dht;
ks_dht_endpoint_t *endpoint;
ks_sockaddr_t raddr;
uint8_t buffer[KS_DHT_DATAGRAM_BUFFER_SIZE];
ks_size_t buffer_length;
};
/**
* Note: This must remain a structure for casting from raw data
*/
struct ks_dht_nodeid_s {
uint8_t id[KS_DHT_NODEID_SIZE];
};
enum ks_afflags_t { ifv4=AF_INET, ifv6=AF_INET6, ifboth=AF_INET+AF_INET6};
enum ks_dht_nodetype_t { KS_DHT_REMOTE=0x01,
KS_DHT_LOCAL=0x02,
KS_DHT_BOTH=KS_DHT_REMOTE+KS_DHT_LOCAL };
enum ks_create_node_flags_t {
KS_DHTRT_CREATE_DEFAULT=0,
KS_DHTRT_CREATE_PING,
KS_DHTRT_CREATE_TOUCH
};
struct ks_dht_node_s {
ks_dht_nodeid_t nodeid;
ks_sockaddr_t addr;
enum ks_dht_nodetype_t type; /* local or remote */
ks_dhtrt_routetable_t* table;
ks_rwl_t *reflock;
};
struct ks_dht_token_s {
uint8_t token[KS_DHT_TOKEN_SIZE];
};
enum ks_dht_job_state_t {
KS_DHT_JOB_STATE_QUERYING,
KS_DHT_JOB_STATE_RESPONDING,
KS_DHT_JOB_STATE_EXPIRING,
KS_DHT_JOB_STATE_COMPLETING,
};
enum ks_dht_job_result_t {
KS_DHT_JOB_RESULT_SUCCESS = 0,
KS_DHT_JOB_RESULT_EXPIRED,
KS_DHT_JOB_RESULT_ERROR,
KS_DHT_JOB_RESULT_FAILURE,
};
struct ks_dht_job_s {
ks_pool_t *pool;
ks_dht_t *dht;
ks_dht_job_t *next;
enum ks_dht_job_state_t state;
enum ks_dht_job_result_t result;
ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr
int32_t attempts;
//enum ks_dht_job_type_t type;
ks_dht_job_callback_t query_callback;
ks_dht_job_callback_t finish_callback;
void *data;
ks_dht_message_t *response;
// job specific query parameters
ks_dht_nodeid_t query_target;
struct bencode *query_salt;
int64_t query_cas;
ks_dht_token_t query_token;
ks_dht_storageitem_t *query_storageitem;
// error response parameters
int64_t error_code;
struct bencode *error_description;
// job specific response parameters
ks_dht_node_t *response_id;
ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes_count;
ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes6_count;
ks_dht_token_t response_token;
int64_t response_seq;
ks_bool_t response_hasitem;
ks_dht_storageitem_t *response_storageitem;
};
struct ks_dhtrt_routetable_s {
void* internal;
ks_pool_t* pool;
ks_logger_t logger;
};
struct ks_dhtrt_querynodes_s {
ks_dht_nodeid_t nodeid; /* in: id to query */
enum ks_afflags_t family; /* in: AF_INET or AF_INET6 or both */
enum ks_dht_nodetype_t type; /* remote, local, or both */
uint8_t max; /* in: maximum to return */
uint8_t count; /* out: number returned */
ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */
};
struct ks_dht_storageitem_pkey_s {
uint8_t key[KS_DHT_STORAGEITEM_PKEY_SIZE];
};
struct ks_dht_storageitem_skey_s {
uint8_t key[KS_DHT_STORAGEITEM_SKEY_SIZE];
};
struct ks_dht_storageitem_signature_s {
uint8_t sig[KS_DHT_STORAGEITEM_SIGNATURE_SIZE];
};
struct ks_dht_message_s {
ks_pool_t *pool;
ks_dht_endpoint_t *endpoint;
ks_sockaddr_t raddr;
struct bencode *data;
uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
ks_size_t transactionid_length;
ks_dht_transaction_t *transaction;
char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
struct bencode *args;
ks_dht_nodeid_t args_id;
};
struct ks_dht_endpoint_s {
ks_pool_t *pool;
ks_dht_nodeid_t nodeid;
ks_sockaddr_t addr;
ks_socket_t sock;
// @todo make sure this node is unlocked, and never gets destroyed, should also never use local nodes in search results as they can be internal
// network addresses, not what others have contacted through
ks_dht_node_t *node;
};
struct ks_dht_transaction_s {
ks_pool_t *pool;
ks_dht_job_t *job;
uint32_t transactionid;
//ks_dht_nodeid_t target; // @todo look at moving this into job now
ks_dht_job_callback_t callback;
ks_time_t expiration;
ks_bool_t finished;
};
struct ks_dht_search_s {
ks_pool_t *pool;
ks_dhtrt_routetable_t *table;
ks_dht_nodeid_t target;
ks_dht_job_callback_t callback;
void *data;
ks_mutex_t *mutex;
ks_hash_t *searched;
int32_t searching;
ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_size_t results_length;
};
struct ks_dht_publish_s {
ks_pool_t *pool;
ks_dht_job_callback_t callback;
void *data;
int64_t cas;
ks_dht_storageitem_t *item;
};
struct ks_dht_distribute_s {
ks_pool_t *pool;
ks_dht_storageitem_callback_t callback;
void *data;
ks_mutex_t *mutex;
int32_t publishing;
int64_t cas;
ks_dht_storageitem_t *item;
};
struct ks_dht_storageitem_s {
ks_pool_t *pool;
ks_dht_nodeid_t id;
ks_time_t expiration;
ks_time_t keepalive;
struct bencode *v;
ks_mutex_t *mutex;
volatile int32_t refc;
ks_dht_storageitem_callback_t callback;
ks_bool_t mutable;
ks_dht_storageitem_pkey_t pk;
ks_dht_storageitem_skey_t sk;
struct bencode *salt;
int64_t seq;
ks_dht_storageitem_signature_t sig;
};
struct ks_dht_s {
ks_pool_t *pool;
ks_bool_t pool_alloc;
ks_thread_pool_t *tpool;
ks_bool_t tpool_alloc;
ks_bool_t autoroute;
ks_port_t autoroute_port;
ks_hash_t *registry_type;
ks_hash_t *registry_query;
ks_hash_t *registry_error;
ks_dht_endpoint_t **endpoints;
int32_t endpoints_length;
int32_t endpoints_size;
ks_hash_t *endpoints_hash;
struct pollfd *endpoints_poll;
ks_q_t *send_q;
ks_dht_message_t *send_q_unsent;
uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
ks_size_t recv_buffer_length;
ks_mutex_t *jobs_mutex;
ks_dht_job_t *jobs_first;
ks_dht_job_t *jobs_last;
ks_time_t transactions_pulse;
ks_mutex_t *transactionid_mutex;
volatile uint32_t transactionid_next;
ks_hash_t *transactions_hash;
ks_dhtrt_routetable_t *rt_ipv4;
ks_dhtrt_routetable_t *rt_ipv6;
ks_time_t tokens_pulse;
volatile uint32_t token_secret_current;
volatile uint32_t token_secret_previous;
ks_time_t token_secret_expiration;
ks_time_t storageitems_pulse;
ks_hash_t *storageitems_hash;
};
/**
* Constructor function for ks_dht_t.
* Will allocate and initialize internal state including registration of message handlers.
* @param dht dereferenced out pointer to the allocated dht instance
* @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new memory pool internally
* @param tpool pointer to a thread pool used by the dht instance, may be NULL to create a new thread pool internally
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM
*/
KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool);
/**
* Destructor function for ks_dht_t.
* Will deinitialize and deallocate internal state.
* @param dht dereferenced in/out pointer to the dht instance, NULL upon return
*/
KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht);
/**
* Enable or disable (default) autorouting support.
* When enabled, autorouting will allow sending to remote addresses on interfaces which are not yet bound.
* The address will be bound with the provided autoroute port when this occurs.
* @param dht pointer to the dht instance
* @param autoroute enable or disable autorouting
* @param port when enabling autorouting this port will be used to bind new addresses, may be 0 to use the default DHT port
*/
KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port);
/**
* Register a callback for a specific message type.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the type text under the 'y' key of a message
* @param callback the callback to be called when a message matches
*/
KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message query.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the type text under the 'q' key of a message
* @param callback the callback to be called when a message matches
*/
KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message error.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the errorcode under the first item of the 'e' key of a message
* @param callback the callback to be called when a message matches
*/
KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Bind a local address and port for receiving UDP datagrams.
* @param dht pointer to the dht instance
* @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly
* @param addr pointer to the local address information
* @param endpoint dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
* @see ks_socket_option
* @see ks_addr_bind
* @see ks_dht_endpoint_alloc
* @see ks_dht_endpoint_init
* @see ks_hash_insert
* @see ks_dhtrt_initroute
* @see ks_dhtrt_create_node
*/
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
/**
* Pulse the internals of dht.
* Handles receiving UDP datagrams, dispatching processing, handles expirations, throttled message sending, route table pulsing, etc.
* @param dht pointer to the dht instance
* @param timeout timeout value used when polling sockets for new UDP datagrams
*/
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig,
ks_dht_storageitem_skey_t *sk,
const uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
const uint8_t *value,
ks_size_t value_length);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitem_reference(ks_dht_storageitem_t *item);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitem_dereference(ks_dht_storageitem_t *item);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitem_callback(ks_dht_storageitem_t *item, ks_dht_storageitem_callback_t callback);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item);
/**
*
*/
KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data);
/**
*
*/
KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(void) ks_dht_get(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
ks_dht_nodeid_t *target,
const uint8_t *salt,
ks_size_t salt_length);
/**
*
*/
KS_DECLARE(void) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item);
/**
* Create a network search of the closest nodes to a target.
* @param dht pointer to the dht instance
* @param family either AF_INET or AF_INET6 for the appropriate network to search
* @param target pointer to the nodeid for the target to be searched
* @param callback an optional callback to add to the search when it is finished
* @param search dereferenced out pointer to the allocated search, may be NULL to ignore search output
* @see ks_dht_search_create
* @see ks_hash_insert
* @see ks_dht_findnode
*/
KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
ks_dht_job_callback_t callback,
void *data,
ks_dhtrt_routetable_t *table,
ks_dht_nodeid_t *target);
KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
int64_t cas,
ks_dht_storageitem_t *item);
KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
ks_dht_storageitem_callback_t callback,
void *data,
ks_dhtrt_routetable_t *table,
int64_t cas,
ks_dht_storageitem_t *item);
/**
* route table methods
*
*/
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
ks_pool_t *pool);
KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,
ks_dht_nodeid_t nodeid,
enum ks_dht_nodetype_t type,
char* ip, unsigned short port,
enum ks_create_node_flags_t flags,
ks_dht_node_t** node);
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid);
KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid);
KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t* table, ks_dhtrt_querynodes_t* query);
KS_DECLARE(ks_dht_node_t*) ks_dhtrt_find_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t id);
KS_DECLARE(ks_status_t) ks_dhtrt_sharelock_node(ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_release_querynodes(ks_dhtrt_querynodes_t* query);
KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t* table);
KS_DECLARE(uint32_t) ks_dhtrt_serialize(ks_dhtrt_routetable_t* table, void** ptr);
KS_DECLARE(ks_status_t) ks_dhtrt_deserialize(ks_dhtrt_routetable_t* table, void* ptr);
/* debugging aids */
KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t* table, int level);
KS_END_EXTERN_C
#endif /* KS_DHT_H */
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -7,7 +7,6 @@
*/
KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
ks_pool_t *pool,
const ks_dht_nodeid_t *nodeid,
const ks_sockaddr_t *addr,
ks_socket_t sock)
{
@ -23,8 +22,6 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
ks_assert(ep);
ep->pool = pool;
if (!nodeid) randombytes_buf(ep->nodeid.id, KS_DHT_NODEID_SIZE);
else memcpy(ep->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
ep->addr = *addr;
ep->sock = sock;

View File

@ -319,7 +319,6 @@ KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
*/
KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
ks_pool_t *pool,
const ks_dht_nodeid_t *nodeid,
const ks_sockaddr_t *addr,
ks_socket_t sock);
KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint);

View File

@ -1,101 +1,588 @@
/*
Copyright (c) 2009-2011 by Juliusz Chroboczek
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef _KS_DHT_H
#define _KS_DHT_H
#ifndef KS_DHT_H
#define KS_DHT_H
#include "ks.h"
#include "ks_bencode.h"
#include "sodium.h"
KS_BEGIN_EXTERN_C
typedef enum {
KS_DHT_EVENT_NONE = 0,
KS_DHT_EVENT_VALUES = 1,
KS_DHT_EVENT_VALUES6 = 2,
KS_DHT_EVENT_SEARCH_DONE = 3,
KS_DHT_EVENT_SEARCH_DONE6 = 4
} ks_dht_event_t;
#define KS_DHT_DEFAULT_PORT 5309
#define KS_DHT_TPOOL_MIN 2
#define KS_DHT_TPOOL_MAX 8
#define KS_DHT_TPOOL_STACK (1024 * 256)
#define KS_DHT_TPOOL_IDLE 10
#define KS_DHT_DATAGRAM_BUFFER_SIZE 1000
//#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
#define KS_DHT_NODEID_SIZE 20
#define KS_DHT_RESPONSE_NODES_MAX_SIZE 8
#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
#define KS_DHT_TRANSACTION_EXPIRATION 10
#define KS_DHT_TRANSACTIONS_PULSE 1
#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
#define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES
#define KS_DHT_STORAGEITEM_SKEY_SIZE crypto_sign_SECRETKEYBYTES
#define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
#define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
#define KS_DHT_STORAGEITEM_EXPIRATION 7200
#define KS_DHT_STORAGEITEM_KEEPALIVE 300
#define KS_DHT_STORAGEITEMS_PULSE 10
#define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
#define KS_DHT_TOKEN_EXPIRATION 300
#define KS_DHT_TOKENS_PULSE 1
#define KS_DHTRT_MAXQUERYSIZE 20
typedef struct ks_dht_s ks_dht_t;
typedef struct ks_dht_datagram_s ks_dht_datagram_t;
typedef struct ks_dht_job_s ks_dht_job_t;
typedef struct ks_dht_nodeid_s ks_dht_nodeid_t;
typedef struct ks_dht_token_s ks_dht_token_t;
typedef struct ks_dht_storageitem_pkey_s ks_dht_storageitem_pkey_t;
typedef struct ks_dht_storageitem_skey_s ks_dht_storageitem_skey_t;
typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t;
typedef struct ks_dht_message_s ks_dht_message_t;
typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
typedef struct ks_dht_transaction_s ks_dht_transaction_t;
typedef struct ks_dht_search_s ks_dht_search_t;
typedef struct ks_dht_publish_s ks_dht_publish_t;
typedef struct ks_dht_distribute_s ks_dht_distribute_t;
typedef struct ks_dht_node_s ks_dht_node_t;
typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t;
typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
typedef enum {
DHT_PARAM_AUTOROUTE = 1
} ks_dht_param_t;
typedef enum {
KS_DHT_AF_INET4 = (1 << 0),
KS_DHT_AF_INET6 = (1 << 1)
} ks_dht_af_flag_t;
typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job);
typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
//typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
typedef ks_status_t (*ks_dht_storageitem_callback_t)(ks_dht_t *dht, ks_dht_storageitem_t *item);
typedef void (*dht_callback_t)(void *closure, ks_dht_event_t event, const unsigned char *info_hash, const void *data, size_t data_len);
struct ks_dht_datagram_s {
ks_pool_t *pool;
ks_dht_t *dht;
ks_dht_endpoint_t *endpoint;
ks_sockaddr_t raddr;
uint8_t buffer[KS_DHT_DATAGRAM_BUFFER_SIZE];
ks_size_t buffer_length;
};
typedef struct dht_handle_s dht_handle_t;
/**
* Note: This must remain a structure for casting from raw data
*/
struct ks_dht_nodeid_s {
uint8_t id[KS_DHT_NODEID_SIZE];
};
KS_DECLARE(int) dht_periodic(dht_handle_t *h, const void *buf, size_t buflen, ks_sockaddr_t *from);
KS_DECLARE(ks_status_t) ks_dht_init(dht_handle_t **handle, ks_dht_af_flag_t af_flags, const unsigned char *id, unsigned int port);
KS_DECLARE(void) ks_dht_set_param(dht_handle_t *h, ks_dht_param_t param, ks_bool_t val);
KS_DECLARE(ks_status_t) ks_dht_add_ip(dht_handle_t *h, char *ip, int port);
KS_DECLARE(void) ks_dht_start(dht_handle_t *h);
KS_DECLARE(int) dht_insert_node(dht_handle_t *h, const unsigned char *id, ks_sockaddr_t *sa);
KS_DECLARE(int) dht_ping_node(dht_handle_t *h, ks_sockaddr_t *sa);
KS_DECLARE(int) dht_search(dht_handle_t *h, const unsigned char *id, int port, int af, dht_callback_t callback, void *closure);
KS_DECLARE(int) dht_nodes(dht_handle_t *h, int af, int *good_return, int *dubious_return, int *cached_return, int *incoming_return);
KS_DECLARE(ks_status_t) ks_dht_one_loop(dht_handle_t *h, int timeout);
KS_DECLARE(ks_status_t) ks_dht_get_bind_addrs(dht_handle_t *h, const ks_sockaddr_t ***addrs, ks_size_t *addrlen);
KS_DECLARE(void) ks_dht_set_callback(dht_handle_t *h, dht_callback_t callback, void *closure);
KS_DECLARE(void) ks_dht_set_port(dht_handle_t *h, unsigned int port);
KS_DECLARE(void) dht_dump_tables(dht_handle_t *h, FILE *f);
KS_DECLARE(int) dht_get_nodes(dht_handle_t *h, struct sockaddr_in *sin, int *num, struct sockaddr_in6 *sin6, int *num6);
KS_DECLARE(int) dht_uninit(dht_handle_t **h);
KS_DECLARE(void) ks_dht_set_v(dht_handle_t *h, const unsigned char *v);
KS_DECLARE(int) ks_dht_calculate_mutable_storage_target(unsigned char *pk, unsigned char *salt, int salt_length, unsigned char *target, int target_length);
KS_DECLARE(int) ks_dht_generate_mutable_storage_args(struct bencode *data, int64_t sequence, int cas,
unsigned char *id, int id_len, /* querying nodes id */
const unsigned char *sk, const unsigned char *pk,
unsigned char *salt, unsigned long long salt_length,
unsigned char *token, unsigned long long token_length,
unsigned char *signature, unsigned long long *signature_length,
struct bencode **arguments);
enum ks_afflags_t { ifv4=AF_INET, ifv6=AF_INET6, ifboth=AF_INET+AF_INET6};
enum ks_dht_nodetype_t { KS_DHT_REMOTE=0x01,
KS_DHT_LOCAL=0x02,
KS_DHT_BOTH=KS_DHT_REMOTE+KS_DHT_LOCAL };
/* This must be provided by the user. */
KS_DECLARE(int) dht_blacklisted(const ks_sockaddr_t *sa);
KS_DECLARE(void) dht_hash(void *hash_return, int hash_size, const void *v1, int len1, const void *v2, int len2, const void *v3, int len3);
KS_DECLARE(int) dht_random_bytes(void *buf, size_t size);
enum ks_create_node_flags_t {
KS_DHTRT_CREATE_DEFAULT=0,
KS_DHTRT_CREATE_PING,
KS_DHTRT_CREATE_TOUCH
};
struct ks_dht_node_s {
ks_dht_nodeid_t nodeid;
ks_sockaddr_t addr;
enum ks_dht_nodetype_t type; /* local or remote */
ks_dhtrt_routetable_t* table;
ks_rwl_t *reflock;
};
KS_DECLARE(int) ks_dht_send_message_mutable(dht_handle_t *h, unsigned char *sk, unsigned char *pk, char **node_id,
char *message_id, int sequence, char *message, ks_time_t life);
struct ks_dht_token_s {
uint8_t token[KS_DHT_TOKEN_SIZE];
};
KS_DECLARE(int) ks_dht_send_message_mutable_cjson(dht_handle_t *h, unsigned char *sk, unsigned char *pk, char **node_id,
char *message_id, int sequence, cJSON *message, ks_time_t life);
enum ks_dht_job_state_t {
KS_DHT_JOB_STATE_QUERYING,
KS_DHT_JOB_STATE_RESPONDING,
KS_DHT_JOB_STATE_EXPIRING,
KS_DHT_JOB_STATE_COMPLETING,
};
typedef void (ks_dht_store_entry_json_cb)(struct dht_handle_s *h, const cJSON *msg, void *obj);
KS_DECLARE(void) ks_dht_store_entry_json_cb_set(struct dht_handle_s *h, ks_dht_store_entry_json_cb *store_json_cb, void *arg);
enum ks_dht_job_result_t {
KS_DHT_JOB_RESULT_SUCCESS = 0,
KS_DHT_JOB_RESULT_EXPIRED,
KS_DHT_JOB_RESULT_ERROR,
KS_DHT_JOB_RESULT_FAILURE,
};
KS_DECLARE(int) ks_dht_api_find_node(dht_handle_t *h, char *node_id_hex, char *target_hex, ks_bool_t ipv6);
struct ks_dht_job_s {
ks_pool_t *pool;
ks_dht_t *dht;
ks_dht_job_t *next;
enum ks_dht_job_state_t state;
enum ks_dht_job_result_t result;
ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr
int32_t attempts;
//enum ks_dht_job_type_t type;
ks_dht_job_callback_t query_callback;
ks_dht_job_callback_t finish_callback;
void *data;
ks_dht_message_t *response;
// job specific query parameters
ks_dht_nodeid_t query_target;
struct bencode *query_salt;
int64_t query_cas;
ks_dht_token_t query_token;
ks_dht_storageitem_t *query_storageitem;
// error response parameters
int64_t error_code;
struct bencode *error_description;
// job specific response parameters
ks_dht_node_t *response_id;
ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes_count;
ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes6_count;
ks_dht_token_t response_token;
int64_t response_seq;
ks_bool_t response_hasitem;
ks_dht_storageitem_t *response_storageitem;
};
struct ks_dhtrt_routetable_s {
void* internal;
ks_pool_t* pool;
ks_logger_t logger;
};
struct ks_dhtrt_querynodes_s {
ks_dht_nodeid_t nodeid; /* in: id to query */
enum ks_afflags_t family; /* in: AF_INET or AF_INET6 or both */
enum ks_dht_nodetype_t type; /* remote, local, or both */
uint8_t max; /* in: maximum to return */
uint8_t count; /* out: number returned */
ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */
};
struct ks_dht_storageitem_pkey_s {
uint8_t key[KS_DHT_STORAGEITEM_PKEY_SIZE];
};
struct ks_dht_storageitem_skey_s {
uint8_t key[KS_DHT_STORAGEITEM_SKEY_SIZE];
};
struct ks_dht_storageitem_signature_s {
uint8_t sig[KS_DHT_STORAGEITEM_SIGNATURE_SIZE];
};
struct ks_dht_message_s {
ks_pool_t *pool;
ks_dht_endpoint_t *endpoint;
ks_sockaddr_t raddr;
struct bencode *data;
uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
ks_size_t transactionid_length;
ks_dht_transaction_t *transaction;
char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
struct bencode *args;
ks_dht_nodeid_t args_id;
};
struct ks_dht_endpoint_s {
ks_pool_t *pool;
ks_sockaddr_t addr;
ks_socket_t sock;
};
struct ks_dht_transaction_s {
ks_pool_t *pool;
ks_dht_job_t *job;
uint32_t transactionid;
//ks_dht_nodeid_t target; // @todo look at moving this into job now
ks_dht_job_callback_t callback;
ks_time_t expiration;
ks_bool_t finished;
};
struct ks_dht_search_s {
ks_pool_t *pool;
ks_dhtrt_routetable_t *table;
ks_dht_nodeid_t target;
ks_dht_job_callback_t callback;
void *data;
ks_mutex_t *mutex;
ks_hash_t *searched;
int32_t searching;
ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_size_t results_length;
};
struct ks_dht_publish_s {
ks_pool_t *pool;
ks_dht_job_callback_t callback;
void *data;
int64_t cas;
ks_dht_storageitem_t *item;
};
struct ks_dht_distribute_s {
ks_pool_t *pool;
ks_dht_storageitem_callback_t callback;
void *data;
ks_mutex_t *mutex;
int32_t publishing;
int64_t cas;
ks_dht_storageitem_t *item;
};
struct ks_dht_storageitem_s {
ks_pool_t *pool;
ks_dht_nodeid_t id;
ks_time_t expiration;
ks_time_t keepalive;
struct bencode *v;
ks_mutex_t *mutex;
volatile int32_t refc;
ks_dht_storageitem_callback_t callback;
ks_bool_t mutable;
ks_dht_storageitem_pkey_t pk;
ks_dht_storageitem_skey_t sk;
struct bencode *salt;
int64_t seq;
ks_dht_storageitem_signature_t sig;
};
struct ks_dht_s {
ks_pool_t *pool;
ks_bool_t pool_alloc;
ks_thread_pool_t *tpool;
ks_bool_t tpool_alloc;
ks_dht_nodeid_t nodeid;
// @todo make sure this node is unlocked, and never gets destroyed, should also never use local nodes in search results as they can be internal
// network addresses, not what others have contacted through
ks_dht_node_t *node;
ks_bool_t autoroute;
ks_port_t autoroute_port;
ks_hash_t *registry_type;
ks_hash_t *registry_query;
ks_hash_t *registry_error;
ks_dht_endpoint_t **endpoints;
int32_t endpoints_length;
int32_t endpoints_size;
ks_hash_t *endpoints_hash;
struct pollfd *endpoints_poll;
ks_q_t *send_q;
ks_dht_message_t *send_q_unsent;
uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
ks_size_t recv_buffer_length;
ks_mutex_t *jobs_mutex;
ks_dht_job_t *jobs_first;
ks_dht_job_t *jobs_last;
ks_time_t transactions_pulse;
ks_mutex_t *transactionid_mutex;
volatile uint32_t transactionid_next;
ks_hash_t *transactions_hash;
ks_dhtrt_routetable_t *rt_ipv4;
ks_dhtrt_routetable_t *rt_ipv6;
ks_time_t tokens_pulse;
volatile uint32_t token_secret_current;
volatile uint32_t token_secret_previous;
ks_time_t token_secret_expiration;
ks_time_t storageitems_pulse;
ks_hash_t *storageitems_hash;
};
/**
* Constructor function for ks_dht_t.
* Will allocate and initialize internal state including registration of message handlers.
* @param dht dereferenced out pointer to the allocated dht instance
* @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new memory pool internally
* @param tpool pointer to a thread pool used by the dht instance, may be NULL to create a new thread pool internally
* @param nodeid pointer to the nodeid for this dht instance
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM
*/
KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid);
/**
* Destructor function for ks_dht_t.
* Will deinitialize and deallocate internal state.
* @param dht dereferenced in/out pointer to the dht instance, NULL upon return
*/
KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht);
/**
* Enable or disable (default) autorouting support.
* When enabled, autorouting will allow sending to remote addresses on interfaces which are not yet bound.
* The address will be bound with the provided autoroute port when this occurs.
* @param dht pointer to the dht instance
* @param autoroute enable or disable autorouting
* @param port when enabling autorouting this port will be used to bind new addresses, may be 0 to use the default DHT port
*/
KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port);
/**
* Register a callback for a specific message type.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the type text under the 'y' key of a message
* @param callback the callback to be called when a message matches
*/
KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message query.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the type text under the 'q' key of a message
* @param callback the callback to be called when a message matches
*/
KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Register a callback for a specific message error.
* Will overwrite any duplicate handlers.
* @param dht pointer to the dht instance
* @param value string of the errorcode under the first item of the 'e' key of a message
* @param callback the callback to be called when a message matches
*/
KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
/**
* Bind a local address and port for receiving UDP datagrams.
* @param dht pointer to the dht instance
* @param addr pointer to the local address information
* @param endpoint dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
* @see ks_socket_option
* @see ks_addr_bind
* @see ks_dht_endpoint_alloc
* @see ks_dht_endpoint_init
* @see ks_hash_insert
* @see ks_dhtrt_initroute
* @see ks_dhtrt_create_node
*/
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
/**
* Pulse the internals of dht.
* Handles receiving UDP datagrams, dispatching processing, handles expirations, throttled message sending, route table pulsing, etc.
* @param dht pointer to the dht instance
* @param timeout timeout value used when polling sockets for new UDP datagrams
*/
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len);
KS_DECLARE(uint8_t *) ks_dht_dehex(uint8_t *data, const char *buffer, ks_size_t len);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig,
ks_dht_storageitem_skey_t *sk,
const uint8_t *salt,
ks_size_t salt_length,
int64_t sequence,
const uint8_t *value,
ks_size_t value_length);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitem_reference(ks_dht_storageitem_t *item);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitem_dereference(ks_dht_storageitem_t *item);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitem_callback(ks_dht_storageitem_t *item, ks_dht_storageitem_callback_t callback);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht);
/**
*
*/
KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item);
/**
*
*/
KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data);
/**
*
*/
KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
ks_dht_nodeid_t *target);
/**
*
*/
KS_DECLARE(void) ks_dht_get(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
ks_dht_nodeid_t *target,
const uint8_t *salt,
ks_size_t salt_length);
/**
*
*/
KS_DECLARE(void) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item);
/**
* Create a network search of the closest nodes to a target.
* @param dht pointer to the dht instance
* @param family either AF_INET or AF_INET6 for the appropriate network to search
* @param target pointer to the nodeid for the target to be searched
* @param callback an optional callback to add to the search when it is finished
* @param search dereferenced out pointer to the allocated search, may be NULL to ignore search output
* @see ks_dht_search_create
* @see ks_hash_insert
* @see ks_dht_findnode
*/
KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
ks_dht_job_callback_t callback,
void *data,
ks_dhtrt_routetable_t *table,
ks_dht_nodeid_t *target);
KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
void *data,
int64_t cas,
ks_dht_storageitem_t *item);
KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
ks_dht_storageitem_callback_t callback,
void *data,
ks_dhtrt_routetable_t *table,
int64_t cas,
ks_dht_storageitem_t *item);
/**
* route table methods
*
*/
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
ks_pool_t *pool);
KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,
ks_dht_nodeid_t nodeid,
enum ks_dht_nodetype_t type,
char* ip, unsigned short port,
enum ks_create_node_flags_t flags,
ks_dht_node_t** node);
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid);
KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid);
KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t* table, ks_dhtrt_querynodes_t* query);
KS_DECLARE(ks_dht_node_t*) ks_dhtrt_find_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t id);
KS_DECLARE(ks_status_t) ks_dhtrt_sharelock_node(ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_release_querynodes(ks_dhtrt_querynodes_t* query);
KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t* table);
KS_DECLARE(uint32_t) ks_dhtrt_serialize(ks_dhtrt_routetable_t* table, void** ptr);
KS_DECLARE(ks_status_t) ks_dhtrt_deserialize(ks_dhtrt_routetable_t* table, void* ptr);
/* debugging aids */
KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t* table, int level);
KS_END_EXTERN_C
#endif /* _KS_DHT_H */
#endif /* KS_DHT_H */
/* For Emacs:
* Local Variables:

File diff suppressed because it is too large Load Diff

View File

@ -64,6 +64,11 @@ testdht2_SOURCES = testdht2.c tap.c
testdht2_CFLAGS = $(AM_CFLAGS)
testdht2_LDADD = $(TEST_LDADD)
check_PROGRAMS += nodeidgen
nodeidgen_SOURCES = nodeidgen.c tap.c
nodeidgen_CFLAGS = $(AM_CFLAGS)
nodeidgen_LDADD = $(TEST_LDADD)
#check_PROGRAMS += testdht_net
#testdht_net_SOURCES = testdht-net.c tap.c
#testdht_net_CFLAGS = $(AM_CFLAGS)

View File

@ -0,0 +1,16 @@
#include <ks.h>
#include <ks_dht.h>
#include <ks_dht-int.h>
#include <tap.h>
int main() {
uint8_t nodeid[KS_DHT_NODEID_SIZE];
char buf[KS_DHT_NODEID_SIZE * 2 + 1];
randombytes_buf(nodeid, KS_DHT_NODEID_SIZE);
ks_dht_hex(nodeid, buf, KS_DHT_NODEID_SIZE);
printf(buf);
return 0;
}

View File

@ -1,6 +1,6 @@
#include <ks.h>
#include <../dht/ks_dht.h>
#include <../dht/ks_dht-int.h>
#include <ks_dht.h>
#include <ks_dht-int.h>
#include <tap.h>
ks_dht_storageitem_skey_t sk;
@ -55,6 +55,7 @@ int main() {
//ks_size_t buflen;
ks_status_t err;
int mask = 0;
ks_dht_nodeid_t nodeid;
ks_dht_t *dht1 = NULL;
ks_dht_t *dht2 = NULL;
ks_dht_t *dht3 = NULL;
@ -112,20 +113,23 @@ int main() {
diag("Binding to %s on ipv6\n", v6);
}
err = ks_dht_create(&dht1, NULL, NULL);
ks_dht_dehex(nodeid.id, "0000000000000000000000000000000000000001", KS_DHT_NODEID_SIZE);
err = ks_dht_create(&dht1, NULL, NULL, &nodeid);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_create(&dht2, NULL, NULL);
ks_dht_dehex(nodeid.id, "0000000000000000000000000000000000000002", KS_DHT_NODEID_SIZE);
err = ks_dht_create(&dht2, NULL, NULL, &nodeid);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_create(&dht3, NULL, NULL);
ks_dht_dehex(nodeid.id, "0000000000000000000000000000000000000003", KS_DHT_NODEID_SIZE);
err = ks_dht_create(&dht3, NULL, NULL, &nodeid);
ok(err == KS_STATUS_SUCCESS);
if (have_v4) {
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht1, NULL, &addr, &ep1);
err = ks_dht_bind(dht1, &addr, &ep1);
ok(err == KS_STATUS_SUCCESS);
raddr1 = addr;
@ -133,7 +137,7 @@ int main() {
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht2, NULL, &addr, &ep2);
err = ks_dht_bind(dht2, &addr, &ep2);
ok(err == KS_STATUS_SUCCESS);
//raddr2 = addr;
@ -141,7 +145,7 @@ int main() {
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 2, AF_INET);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht3, NULL, &addr, &ep3);
err = ks_dht_bind(dht3, &addr, &ep3);
ok(err == KS_STATUS_SUCCESS);
//raddr3 = addr;
@ -151,19 +155,19 @@ int main() {
err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT, AF_INET6);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht1, NULL, &addr, NULL);
err = ks_dht_bind(dht1, &addr, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 1, AF_INET6);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht2, NULL, &addr, NULL);
err = ks_dht_bind(dht2, &addr, NULL);
ok(err == KS_STATUS_SUCCESS);
err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 2, AF_INET6);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht_bind(dht3, NULL, &addr, NULL);
err = ks_dht_bind(dht3, &addr, NULL);
ok(err == KS_STATUS_SUCCESS);
}
@ -175,11 +179,11 @@ int main() {
ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ok(ks_dhtrt_find_node(dht1->rt_ipv4, dht2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING)
ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
ok(ks_dhtrt_find_node(dht2->rt_ipv4, dht1->nodeid) != NULL); // The node should be good, and thus be returned as good
ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
@ -189,7 +193,7 @@ int main() {
ks_dht_pulse(dht2, 100);
ks_dht_pulse(dht3, 100);
}
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
ok(ks_dhtrt_find_node(dht1->rt_ipv4, dht2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
ks_dht_ping(dht3, &raddr1, NULL, NULL); // (QUERYING)
@ -198,11 +202,11 @@ int main() {
ks_dht_pulse(dht1, 100); // Receive and process ping query from dht3, queue and send ping response
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ok(ks_dhtrt_find_node(dht1->rt_ipv4, dht3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_pulse(dht3, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING)
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
ok(ks_dhtrt_find_node(dht3->rt_ipv4, dht1->nodeid) != NULL); // The node should be good, and thus be returned as good
ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
@ -212,26 +216,26 @@ int main() {
ks_dht_pulse(dht2, 100);
ks_dht_pulse(dht3, 100);
}
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
ok(ks_dhtrt_find_node(dht1->rt_ipv4, dht2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
// Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
/*
diag("Find_Node test\n");
ks_dht_findnode(dht3, NULL, &raddr1, NULL, NULL, &ep2->nodeid);
ks_dht_findnode(dht3, NULL, &raddr1, NULL, NULL, &dht2->nodeid);
ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ok(ks_dhtrt_find_node(dht1->rt_ipv4, dht3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ok(ks_dhtrt_find_node(dht3->rt_ipv4, dht2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) {
@ -239,12 +243,12 @@ int main() {
ks_dht_pulse(dht2, 100);
ks_dht_pulse(dht3, 100);
}
ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
ok(ks_dhtrt_find_node(dht3->rt_ipv4, dht2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
*/
diag("Search test\n");
ks_dht_search(dht3, dht2_search_callback, NULL, dht3->rt_ipv4, &ep2->nodeid);
ks_dht_search(dht3, dht2_search_callback, NULL, dht3->rt_ipv4, &dht2->nodeid);
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 20; ++i) {
ks_dht_pulse(dht1, 100);