diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 5cf4413cc2..eb9db01561 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -229,6 +229,40 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_ return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint) +{ + // @todo lookup standard def for IPV6 max size + char ip[48]; + ks_dht_endpoint_t *ep = NULL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(endpoint); + + *endpoint = NULL; + + ks_ip_route(ip, sizeof(ip), raddr->host); + + // @todo readlock hash + if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { + ks_sockaddr_t addr; + ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); + if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + } + + if (!ep) { + ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); + return KS_STATUS_FAIL; + } + + return KS_STATUS_SUCCESS; +} + /** * */ @@ -237,7 +271,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k ks_assert(dht); ks_assert(value); ks_assert(callback); - + // @todo writelock registry return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } @@ -249,7 +283,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_assert(dht); ks_assert(value); ks_assert(callback); - + // @todo writelock registry return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } @@ -261,7 +295,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_assert(dht); ks_assert(value); ks_assert(callback); - + // @todo writelock registry return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } @@ -286,7 +320,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid dht->bind_ipv4 |= addr->family == AF_INET; dht->bind_ipv6 |= addr->family == AF_INET6; - // @todo start of ks_dht_endpoint_bind if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) { return KS_STATUS_FAIL; } @@ -311,7 +344,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE); ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE); - // @todo end of ks_dht_endpoint_bind epindex = dht->endpoints_size++; dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, @@ -329,11 +361,11 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid // @todo initialize or add local nodeid to appropriate route table if (ep->addr.family == AF_INET) { if (!dht->rt_ipv4) { - //ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, (ks_dhtrt_nodeid_t)); + //ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, &ep->nodeid); } } else { if (!dht->rt_ipv6) { - //ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool); + //ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, &ep->nodeid); } } @@ -369,6 +401,7 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) raddr.family = dht->endpoints[i]->addr.family; if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) { + // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool ks_dht_process(dht, dht->endpoints[i], &raddr); } } @@ -538,40 +571,23 @@ KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht) */ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) { - // @todo lookup standard def for IPV6 max size - char ip[48]; - ks_dht_endpoint_t *ep; // @todo calculate max IPV6 payload size? char buf[1000]; ks_size_t buf_len; ks_assert(dht); ks_assert(message); + ks_assert(message->endpoint); ks_assert(message->data); // @todo blacklist check - ks_ip_route(ip, sizeof(ip), message->raddr.host); - - if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { - ks_sockaddr_t addr; - ks_addr_set(&addr, ip, dht->autoroute_port, message->raddr.family); - if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - } - - if (!ep) { - ks_log(KS_LOG_DEBUG, "No route available to %s\n", message->raddr.host); - return KS_STATUS_FAIL; - } - buf_len = ben_encode2(buf, sizeof(buf), message->data); ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port); ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); - return ks_socket_sendto(ep->sock, (void *)buf, &buf_len, &message->raddr); + return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr); } /** @@ -594,6 +610,10 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, ks_assert(transactionid); ks_assert(errorstr); + if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } @@ -639,14 +659,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); - ks_assert(ep); ks_assert(raddr); ks_assert(query); ks_assert(callback); ks_assert(message); *message = NULL; - + + if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + // @todo atomic increment or mutex transactionid = dht->transactionid_next++; @@ -706,13 +729,16 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); - ks_assert(ep); ks_assert(raddr); ks_assert(transactionid); ks_assert(message); *message = NULL; - + + if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) { goto done; } @@ -753,7 +779,7 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k return KS_STATUS_FAIL; } - ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE)); + ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); ks_q_push(dht->send_q, (void *)message); @@ -777,12 +803,11 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e return KS_STATUS_FAIL; } - ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE)); + ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); ks_q_push(dht->send_q, (void *)message); - //ks_dht_send(dht, raddr, message); return KS_STATUS_SUCCESS; } @@ -818,7 +843,8 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_ if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) { goto done; } - + + // @todo readlocking registry for calling from threadpool if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); } else { @@ -874,6 +900,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me message->args = a; + // @todo readlocking registry for calling from threadpool if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) { ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); } else { @@ -903,7 +930,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n"); return KS_STATUS_FAIL; } - // todo end of ks_dht_message_parse_response + // @todo end of ks_dht_message_parse_response message->args = r; @@ -923,7 +950,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction->raddr.host, transaction->raddr.port); } else { - // @todo mark transaction for later removal transaction->finished = KS_TRUE; ret = transaction->callback(dht, message); } @@ -969,7 +995,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me memcpy(error, et, es_len); error[es_len] = '\0'; - // todo end of ks_dht_message_parse_error + // @todo end of ks_dht_message_parse_error message->args = e; @@ -989,10 +1015,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me transaction->raddr.host, transaction->raddr.port); } else { - // @todo mark transaction for later removal ks_dht_message_callback_t callback; transaction->finished = KS_TRUE; + // @todo readlock on registry if ((callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) { ret = callback(dht, message); } else { @@ -1045,20 +1071,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ &r) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } - - //if (ks_dht_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) { - // goto done; - //} - //if (ks_dht_message_init(response, message->endpoint, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) { - // goto done; - //} - - //if (ks_dht_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) { - // goto done; - //} - - ben_dict_set(r, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); + ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); ks_q_push(dht->send_q, (void *)response); @@ -1082,8 +1096,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_bool_t want6 = KS_FALSE; ks_dht_message_t *response = NULL; struct bencode *r = NULL; - uint8_t buffer[1000]; - ks_size_t buffer_length = 0; + uint8_t buffer4[1000]; + uint8_t buffer6[1000]; + ks_size_t buffer4_length = 0; + ks_size_t buffer6_length = 0; ks_assert(dht); ks_assert(message); @@ -1141,10 +1157,22 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); - // @todo get closest nodes to target from route table - // @todo compact into buffer - if (ks_dht_utility_compact_node((ks_dht_nodeid_t *)idv, &message->raddr, buffer, &buffer_length, sizeof(buffer)) != KS_STATUS_SUCCESS) { + if (want4) { + // @todo get closest nodes to target from ipv4 route table + // @todo compact nodes into buffer4 + } + if (want6) { + // @todo get closest nodes to target from ipv6 route table + // @todo compact nodes into buffer6 + } + + // @todo remove this, testing only + if (ks_dht_utility_compact_node((ks_dht_nodeid_t *)idv, + &message->raddr, + message->raddr.family == AF_INET ? buffer4 : buffer6, + message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length, + message->raddr.family == AF_INET ? sizeof(buffer4) : sizeof(buffer6)) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } @@ -1158,21 +1186,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess return KS_STATUS_FAIL; } - //if (ks_dht_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) { - // goto done; - //} - - //if (ks_dht_message_init(response, message->endpoint, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) { - // goto done; - //} - - //if (ks_dht_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) { - // goto done; - //} - - ben_dict_set(r, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); - // @todo populate nodes/nodes6 - ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer, buffer_length)); + ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); + if (want4) { + ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); + } + if (want6) { + ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length)); + } ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); ks_q_push(dht->send_q, (void *)response);