diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 9128275ac3..a236f2ff8a 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -15,16 +15,16 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created. */ - if (pool_alloc && (ret = ks_pool_open(&pool)) != KS_STATUS_SUCCESS) goto done; + if (pool_alloc) { + ks_pool_open(&pool); + ks_assert(pool); + } /** * Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created. */ *dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t)); - if (!d) { - ret = KS_STATUS_NO_MEM; - goto done; - } + ks_assert(d); /** * Keep track of the pool used for future allocations and cleanup. @@ -39,12 +39,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread d->tpool = tpool; if (!tpool) { d->tpool_alloc = KS_TRUE; - if ((ret = ks_thread_pool_create(&d->tpool, - KS_DHT_TPOOL_MIN, - KS_DHT_TPOOL_MAX, - KS_DHT_TPOOL_STACK, - KS_PRI_NORMAL, - KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) goto done; + ks_thread_pool_create(&d->tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE); + ks_assert(d->tpool); } /** @@ -56,10 +52,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the message type registry. */ - if ((ret = ks_hash_create(&d->registry_type, - KS_HASH_MODE_DEFAULT, - KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->registry_type); /** * Register the message type callbacks for query (q), response (r), and error (e) @@ -71,10 +65,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the message query registry. */ - if ((ret = ks_hash_create(&d->registry_query, - KS_HASH_MODE_DEFAULT, - KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->registry_query); /** * Register the message query callbacks for ping, find_node, etc. @@ -87,10 +79,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the message error registry. */ - if ((ret = ks_hash_create(&d->registry_error, - KS_HASH_MODE_DEFAULT, - KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->registry_error); // @todo register 301 error for internal get/put CAS hash mismatch retry handler /** @@ -113,20 +103,19 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * This also provides the basis for autorouting to find unbound interfaces and bind them at runtime. * This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent */ - if ((ret = ks_hash_create(&d->endpoints_hash, - KS_HASH_MODE_DEFAULT, - KS_HASH_FLAG_RWLOCK, - d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool); + ks_assert(d->endpoints_hash); /** * Default expirations to not be checked for one pulse. */ - d->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS; + d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000); /** * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full. */ - if ((ret = ks_q_create(&d->send_q, d->pool, 0)) != KS_STATUS_SUCCESS) goto done; + ks_q_create(&d->send_q, d->pool, 0); + ks_assert(d->send_q); /** * If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message. @@ -141,7 +130,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Initialize the transaction id mutex, should use atomic increment instead */ - if ((ret = ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool); + ks_assert(d->tid_mutex); /** * Initialize the first transaction id randomly, this doesn't really matter. @@ -152,10 +142,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * Create the hash to track pending transactions on queries that are pending responses. * It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred. */ - if ((ret = ks_hash_create(&d->transactions_hash, - KS_HASH_MODE_INT, - KS_HASH_FLAG_RWLOCK, - d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool); + ks_assert(d->transactions_hash); /** * The internal route tables will be latent allocated when binding. @@ -166,10 +154,9 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the hash to store searches. */ - if ((ret = ks_hash_create(&d->search_hash, - KS_HASH_MODE_ARBITRARY, - KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&d->search_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->search_hash); + /** * The search hash uses arbitrary key size, which requires the key size be provided. */ @@ -179,21 +166,20 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. */ d->token_secret_current = d->token_secret_previous = rand(); - d->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; + d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000); /** * Create the hash to store arbitrary data for BEP44. */ - if ((ret = ks_hash_create(&d->storage_hash, - KS_HASH_MODE_ARBITRARY, - KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - d->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->storage_hash); + /** * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's. */ ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE); - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (d) ks_dht_destroy(&d); else if (pool_alloc && pool) ks_pool_close(&pool); @@ -348,7 +334,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) */ pool = d->pool; pool_alloc = d->pool_alloc; - + /** * Free the dht instance from the pool, after this the dht instance memory is invalid. */ @@ -442,7 +428,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k ks_assert(value); ks_assert(callback); - return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; + return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback); } KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) @@ -451,7 +437,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_assert(value); ks_assert(callback); - return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; + return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback); } KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) @@ -460,7 +446,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_assert(value); ks_assert(callback); - return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; + return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback); } @@ -482,7 +468,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid if (endpoint) *endpoint = NULL; ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED); - if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret; + ks_hash_read_unlock(dht->endpoints_hash); if (ep) { ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host); return KS_STATUS_FAIL; @@ -514,7 +500,8 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid /** * Allocate the endpoint to track the local socket. */ - if ((ret = ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done; + ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock); + ks_assert(ep); /** * Resize the endpoints array to take another endpoint pointer. @@ -523,15 +510,14 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, (void *)dht->endpoints, sizeof(ks_dht_endpoint_t *) * dht->endpoints_size); + ks_assert(dht->endpoints); dht->endpoints[epindex] = ep; /** * Add the new endpoint into the endpoints hash for quick lookups. + * @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed */ - if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) { - ret = KS_STATUS_FAIL; - goto done; - } + if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done; /** * Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data. @@ -539,6 +525,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool, (void *)dht->endpoints_poll, sizeof(struct pollfd) * dht->endpoints_size); + ks_assert(dht->endpoints_poll); dht->endpoints_poll[epindex].fd = ep->sock; dht->endpoints_poll[epindex].events = POLLIN | POLLERR; @@ -553,9 +540,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ep->addr.host, ep->addr.port, &ep->node)) != KS_STATUS_SUCCESS) goto done; - /** - * Do not release the ep->node, keep it alive until cleanup - */ } else { if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv6, @@ -564,18 +548,16 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ep->addr.host, ep->addr.port, &ep->node)) != KS_STATUS_SUCCESS) goto done; - /** - * Do not release the ep->node, keep it alive until cleanup - */ } + /** + * Do not release the ep->node, keep it alive until cleanup + */ /** * If the endpoint output is being captured, assign it and return successfully. */ if (endpoint) *endpoint = ep; - ret = KS_STATUS_SUCCESS; - done: if (ret != KS_STATUS_SUCCESS) { /** @@ -583,7 +565,10 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid * This will be done in ks_dht_endpoint_destroy only if the socket was assigned during a successful ks_dht_endpoint_create. * Then return whatever failure condition resulted in landed here. */ - if (ep) ks_dht_endpoint_destroy(&ep); + if (ep) { + ks_hash_remove(dht->endpoints_hash, ep->addr.host); + ks_dht_endpoint_destroy(&ep); + } else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock); if (endpoint) *endpoint = NULL; @@ -594,31 +579,32 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) { ks_dht_datagram_t *datagram = NULL; - int32_t result; ks_sockaddr_t raddr; ks_assert(dht); - ks_assert (timeout > 0); + ks_assert(timeout > 0); if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0; - result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout); - if (result > 0) { + // @todo confirm how poll/wsapoll react to zero size and NULL array + if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) { for (int32_t i = 0; i < dht->endpoints_size; ++i) { if (!(dht->endpoints_poll[i].revents & POLLIN)) continue; - + raddr = (const ks_sockaddr_t){ 0 }; dht->recv_buffer_length = sizeof(dht->recv_buffer); raddr.family = dht->endpoints[i]->addr.family; if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue; - + if (dht->recv_buffer_length == sizeof(dht->recv_buffer)) { ks_log(KS_LOG_DEBUG, "Dropped oversize datagram from %s %d\n", raddr.host, raddr.port); continue; } - - if (ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr) == KS_STATUS_SUCCESS && - ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram); + + ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr); + ks_assert(datagram); + + if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram); } } @@ -633,13 +619,12 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) { ks_hash_iterator_t *it = NULL; - ks_time_t now = ks_time_now_sec(); + ks_time_t now = ks_time_now(); ks_assert(dht); - if (dht->pulse_expirations <= now) { - dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS; - } + if (dht->pulse_expirations > now) return; + dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000); ks_hash_write_lock(dht->transactions_hash); for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { @@ -660,8 +645,47 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) } ks_hash_write_unlock(dht->transactions_hash); + ks_hash_write_lock(dht->search_hash); + for (it = ks_hash_first(dht->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *search_key = NULL; + ks_dht_search_t *search_value = NULL; + + ks_hash_this(it, &search_key, NULL, (void **)&search_value); + + ks_hash_write_lock(search_value->pending); + for (ks_hash_iterator_t *i = ks_hash_first(search_value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) { + const void *pending_key = NULL; + ks_dht_search_pending_t *pending_value = NULL; + ks_bool_t pending_remove = KS_FALSE; + + ks_hash_this(i, &pending_key, NULL, (void **)&pending_value); + + if (pending_value->finished) pending_remove = KS_TRUE; + else if (pending_value->expiration <= now) { + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_log(KS_LOG_DEBUG, + "Search for %s pending find_node to %s has expired without response\n", + ks_dht_hexid(&search_value->target, id_buf), + ks_dht_hexid(&pending_value->nodeid, id2_buf)); + pending_remove = KS_TRUE; + } + if (pending_remove) { + ks_hash_remove(search_value->pending, (void *)pending_key); + ks_dht_search_pending_destroy(&pending_value); + } + } + ks_hash_write_unlock(search_value->pending); + if (ks_hash_count(search_value->pending) == 0) { + for (int32_t index = 0; index < search_value->callbacks_size; ++index) search_value->callbacks[index](dht, search_value); + ks_hash_remove(dht->search_hash, (void *)search_key); + ks_dht_search_destroy(&search_value); + } + } + ks_hash_write_unlock(dht->search_hash); + if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { - dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; + dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000); dht->token_secret_previous = dht->token_secret_current; dht->token_secret_current = rand(); } @@ -987,10 +1011,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, *message = msg; - if (!ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) { - ret = KS_STATUS_FAIL; - goto done; - } + if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done; if (transaction) *transaction = trans; @@ -1176,11 +1197,13 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_search_callback_t callback, ks_dht_search_t **search) { + ks_bool_t locked_search = KS_FALSE; + ks_bool_t locked_pending = KS_FALSE; ks_dht_search_t *s = NULL; - ks_status_t ret = KS_STATUS_SUCCESS; ks_bool_t inserted = KS_FALSE; ks_bool_t allocated = KS_FALSE; ks_dhtrt_querynodes_t query; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(family == AF_INET || family == AF_INET6); @@ -1188,9 +1211,12 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, if (search) *search = NULL; + // @todo start write lock on search_hash and hold until after inserting // check hash for target to see if search already exists - s = ks_hash_search(dht->search_hash, target->id, KS_READLOCKED); - ks_hash_read_unlock(dht->search_hash); // @todo hold lock until finished adding new entry? + ks_hash_write_lock(dht->search_hash); + locked_search = KS_TRUE; + + s = ks_hash_search(dht->search_hash, target->id, KS_UNLOCKED); // if search does not exist, create new search and store in hash by target if (!s) { @@ -1204,6 +1230,17 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, // if the search is old then bail out and return successfully if (!allocated) goto done; + if ((ret = ks_hash_insert(dht->search_hash, s->target.id, s)) == KS_STATUS_SUCCESS) goto done; + inserted = KS_TRUE; + + // lock pending before unlocking the search hash to prevent this search from being used before we finish setting it up + ks_hash_write_lock(s->pending); + locked_pending = KS_TRUE; + + // release search hash lock now, but pending is still locked + ks_hash_write_unlock(dht->search_hash); + locked_search = KS_FALSE; + // find closest good nodes to target locally and store as the closest results query.nodeid = *target; query.type = KS_DHT_REMOTE; @@ -1219,27 +1256,27 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target); // add to pending with expiration if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done; - if (!ks_hash_insert(s->pending, n->nodeid.id, pending)) { + if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) { ks_dht_search_pending_destroy(&pending); - ret = KS_STATUS_FAIL; goto done; } if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done; + // increment here in case we end up bailing out; execute with what it has or destroy the search? + s->results_length++; } - s->results_length = query.count; // @todo release query nodes - - // @todo if entry has been added since we checked above this may fail, try adding callback instead of failing? or retain lock from earlier - if (!ks_hash_insert(dht->search_hash, s->target.id, s)) { - ret = KS_STATUS_FAIL; - goto done; - } - inserted = KS_TRUE; + ks_hash_write_unlock(s->pending); + locked_pending = KS_FALSE; if (search) *search = s; done: - if (ret != KS_STATUS_SUCCESS && !inserted && s) ks_dht_search_destroy(&s); + if (locked_search) ks_hash_write_unlock(dht->search_hash); + if (locked_pending) ks_hash_write_unlock(s->pending); + if (ret != KS_STATUS_SUCCESS) { + if (!inserted && s) ks_dht_search_destroy(&s); + *search = NULL; + } return ret; } @@ -1254,18 +1291,18 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, { ks_dht_message_t *error = NULL; struct bencode *e = NULL; - ks_status_t ret = KS_STATUS_FAIL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); ks_assert(transactionid); ks_assert(errorstr); - if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done; ben_list_append(e, ben_int(errorcode)); ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); @@ -1273,8 +1310,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); ks_q_push(dht->send_q, (void *)error); - ret = KS_STATUS_SUCCESS; - done: if (ret != KS_STATUS_SUCCESS && error) ks_dht_message_destroy(&error); return ret; @@ -1292,7 +1327,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ks_dht_transaction_t *transaction; uint32_t *tid; uint32_t transactionid; - ks_status_t ret = KS_STATUS_FAIL; + ks_dht_message_callback_t callback; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); @@ -1308,7 +1344,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me es_len = ben_str_len(es); if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) { ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len); - return KS_STATUS_FAIL; + ret = KS_STATUS_FAIL; + goto done; } errorcode = ben_int_val(ec); et = ben_str_val(es); @@ -1327,27 +1364,30 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me if (!transaction) { ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid); - } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { + ret = KS_STATUS_FAIL; + goto done; + } + + if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { ks_log(KS_LOG_DEBUG, "Message error rejected due to spoofing from %s %d, expected %s %d\n", message->raddr.host, message->raddr.port, transaction->raddr.host, transaction->raddr.port); - } else { - ks_dht_message_callback_t callback; - transaction->finished = KS_TRUE; - - callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED); - ks_hash_read_unlock(dht->registry_error); - - if (callback) ret = callback(dht, message); - else { - ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error); - ret = KS_STATUS_SUCCESS; - } + ret = KS_STATUS_FAIL; + goto done; } + transaction->finished = KS_TRUE; + + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED); + ks_hash_read_unlock(dht->registry_error); + + if (callback) ret = callback(dht, message); + else ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error); + + done: return ret; } @@ -1356,25 +1396,27 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k { ks_dht_message_t *message = NULL; struct bencode *a = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); - if (ks_dht_setup_query(dht, - ep, - raddr, - "ping", - ks_dht_process_response_ping, - NULL, - &message, - &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_setup_query(dht, + ep, + raddr, + "ping", + ks_dht_process_response_ping, + NULL, + &message, + &a)) != KS_STATUS_SUCCESS) goto done; ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); ks_q_push(dht->send_q, (void *)message); - return KS_STATUS_SUCCESS; + done: + return ret; } KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message) @@ -1385,37 +1427,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); - if (ks_dht_setup_response(dht, - message->endpoint, - &message->raddr, - message->transactionid, - message->transactionid_length, - &response, - &r) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if ((ret = ks_dht_setup_response(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + &response, + &r)) != KS_STATUS_SUCCESS) goto done; ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); ks_q_push(dht->send_q, (void *)response); - return KS_STATUS_SUCCESS; + done: + return ret; } KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message) @@ -1424,24 +1466,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); - return KS_STATUS_SUCCESS; + done: + return ret; } @@ -1450,19 +1494,20 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e ks_dht_transaction_t *transaction = NULL; ks_dht_message_t *message = NULL; struct bencode *a = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); ks_assert(targetid); - if (ks_dht_setup_query(dht, - ep, - raddr, - "find_node", - ks_dht_process_response_findnode, - &transaction, - &message, - &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_setup_query(dht, + ep, + raddr, + "find_node", + ks_dht_process_response_findnode, + &transaction, + &message, + &a)) != KS_STATUS_SUCCESS) goto done; memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE); @@ -1473,7 +1518,8 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); ks_q_push(dht->send_q, (void *)message); - return KS_STATUS_SUCCESS; + done: + return ret; } KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message) @@ -1493,14 +1539,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_dht_node_t *node = NULL; ks_dhtrt_querynodes_t query; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done; want = ben_dict_get_by_str(message->args, "want"); if (want) { @@ -1521,8 +1568,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); @@ -1537,11 +1584,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess for (int32_t i = 0; i < query.count; ++i) { ks_dht_node_t *qn = query.nodes[i]; - if (ks_dht_utility_compact_nodeinfo(&qn->nodeid, - &qn->addr, - buffer4, - &buffer4_length, - sizeof(buffer4)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid, + &qn->addr, + buffer4, + &buffer4_length, + sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); } @@ -1553,23 +1600,23 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess for (int32_t i = 0; i < query.count; ++i) { ks_dht_node_t *qn = query.nodes[i]; - if (ks_dht_utility_compact_nodeinfo(&qn->nodeid, - &qn->addr, - buffer6, - &buffer6_length, - sizeof(buffer6)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid, + &qn->addr, + buffer6, + &buffer6_length, + sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); } } - if (ks_dht_setup_response(dht, - message->endpoint, - &message->raddr, - message->transactionid, - message->transactionid_length, - &response, - &r) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_setup_response(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + &response, + &r)) != KS_STATUS_SUCCESS) goto done; ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); @@ -1578,7 +1625,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); ks_q_push(dht->send_q, (void *)response); - return KS_STATUS_SUCCESS; + done: + return ret; } KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message) @@ -1595,12 +1643,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_dht_search_t *search = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); ks_assert(message->transaction); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; n = ben_dict_get_by_str(message->args, "nodes"); if (n) { @@ -1616,13 +1665,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; - search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_READLOCKED); + ks_hash_read_lock(dht->search_hash); + search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_UNLOCKED); ks_hash_read_unlock(dht->search_hash); if (search) { ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED); @@ -1635,7 +1685,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_sockaddr_t addr; addr.family = AF_INET; - if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Expanded ipv4 nodeinfo for %s (%s %d)\n", @@ -1681,12 +1731,12 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m search->results[results_index] = nid; search->distances[results_index] = distance; - if (ks_dht_search_pending_create(&pending, search->pool, &nid) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (!ks_hash_insert(search->pending, nid.id, pending)) { + if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) { ks_dht_search_pending_destroy(&pending); - return KS_STATUS_FAIL; + goto done; } - if (ks_dht_send_findnode(dht, NULL, &addr, &search->target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done; } } } @@ -1696,7 +1746,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_sockaddr_t addr; addr.family = AF_INET6; - if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Expanded ipv6 nodeinfo for %s (%s %d)\n", @@ -1712,7 +1762,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); - return KS_STATUS_SUCCESS; + done: + return ret; } @@ -1758,14 +1809,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done; seq = ben_dict_get_by_str(message->args, "seq"); if (seq) sequence = ben_int_val(seq); @@ -1773,8 +1825,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); @@ -1790,15 +1842,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t // @todo compact ipv4 and ipv6 nodes into separate buffers - if (ks_dht_setup_response(dht, - message->endpoint, - &message->raddr, - message->transactionid, - message->transactionid_length, - &response, - &r) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if ((ret = ks_dht_setup_response(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + &response, + &r)) != KS_STATUS_SUCCESS) goto done; ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE)); @@ -1817,7 +1867,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_log(KS_LOG_DEBUG, "Sending message response get\n"); ks_q_push(dht->send_q, (void *)response); - return KS_STATUS_SUCCESS; + done: + return ret; } KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message) @@ -1827,14 +1878,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; // @todo add extract function for mutable ks_dht_storageitem_key_t // @todo add extract function for mutable ks_dht_storageitem_signature_t @@ -1842,16 +1894,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; // @todo add/touch bucket entries for other nodes/nodes6 returned ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); - return KS_STATUS_SUCCESS; + done: + return ret; } @@ -1865,37 +1918,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); - if (ks_dht_setup_response(dht, - message->endpoint, - &message->raddr, - message->transactionid, - message->transactionid_length, - &response, - &r) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if ((ret = ks_dht_setup_response(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + &response, + &r)) != KS_STATUS_SUCCESS) goto done; //ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message response put\n"); ks_q_push(dht->send_q, (void *)response); - return KS_STATUS_SUCCESS; + done: + return ret; } KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message) @@ -1904,24 +1957,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; routetable = message->endpoint->node->table; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); - return KS_STATUS_SUCCESS; + done: + return ret; } /* For Emacs: diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 0b4007b5c1..59311de801 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -27,7 +27,7 @@ KS_BEGIN_EXTERN_C #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 -#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30 +#define KS_DHT_TRANSACTION_EXPIRATION 30 #define KS_DHT_SEARCH_EXPIRATION 10 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE diff --git a/libs/libks/src/dht/ks_dht_datagram.c b/libs/libks/src/dht/ks_dht_datagram.c index 8b6140f2b2..83927ddb9b 100644 --- a/libs/libks/src/dht/ks_dht_datagram.c +++ b/libs/libks/src/dht/ks_dht_datagram.c @@ -17,14 +17,11 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram, ks_assert(endpoint); ks_assert(raddr); ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6); - - *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t)); - if (!dg) { - ret = KS_STATUS_NO_MEM; - goto done; - } - dg->pool = pool; + *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t)); + ks_assert(dg); + + dg->pool = pool; dg->dht = dht; dg->endpoint = endpoint; dg->raddr = *raddr; @@ -32,7 +29,7 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram, memcpy(dg->buffer, dht->recv_buffer, dht->recv_buffer_length); dg->buffer_length = dht->recv_buffer_length; - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (dg) ks_dht_datagram_destroy(&dg); *datagram = NULL; diff --git a/libs/libks/src/dht/ks_dht_endpoint.c b/libs/libks/src/dht/ks_dht_endpoint.c index 61184bfcbf..ad9a44ec5e 100644 --- a/libs/libks/src/dht/ks_dht_endpoint.c +++ b/libs/libks/src/dht/ks_dht_endpoint.c @@ -18,19 +18,17 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint, ks_assert(pool); ks_assert(addr); ks_assert(addr->family == AF_INET || addr->family == AF_INET6); - + *endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t)); - if (!ep) { - ret = KS_STATUS_NO_MEM; - goto done; - } + ks_assert(ep); + ep->pool = pool; if (!nodeid) randombytes_buf(ep->nodeid.id, KS_DHT_NODEID_SIZE); else memcpy(ep->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE); ep->addr = *addr; ep->sock = sock; - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (ep) ks_dht_endpoint_destroy(&ep); *endpoint = NULL; @@ -50,9 +48,6 @@ KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint) ep = *endpoint; - if (ep->node) { - // @todo release the node? - } if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock); ks_pool_free(ep->pool, ep); diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index 63ea519fc0..1b2284decb 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -1,9 +1,6 @@ #include "ks_dht.h" #include "ks_dht-int.h" -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, ks_pool_t *pool, ks_dht_endpoint_t *endpoint, @@ -17,17 +14,17 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, ks_assert(pool); *message = m = ks_pool_alloc(pool, sizeof(ks_dht_message_t)); - if (!m) { - ret = KS_STATUS_NO_MEM; - goto done; - } - m->pool = pool; + ks_assert(m); + m->pool = pool; m->endpoint = endpoint; m->raddr = *raddr; - if (alloc_data) m->data = ben_dict(); + if (alloc_data) { + m->data = ben_dict(); + ks_assert(m->data); + } - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (m) ks_dht_message_destroy(&m); *message = NULL; @@ -35,9 +32,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, return ret; } -/** - * - */ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message) { ks_dht_message_t *m; @@ -57,9 +51,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message) } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length) { struct bencode *t; @@ -121,9 +112,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, uint32_t transactionid, const char *query, @@ -143,6 +131,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, // @note r joins message->data and will be freed with it a = ben_dict(); + ks_assert(a); ben_dict_set(message->data, ben_blob("a", 1), a); if (args) *args = a; @@ -150,9 +139,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, uint8_t *transactionid, ks_size_t transactionid_length, @@ -168,6 +154,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, // @note r joins message->data and will be freed with it r = ben_dict(); + ks_assert(r); ben_dict_set(message->data, ben_blob("r", 1), r); if (args) *args = r; @@ -193,6 +180,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, // @note r joins message->data and will be freed with it e = ben_list(); + ks_assert(e); ben_dict_set(message->data, ben_blob("e", 1), e); if (args) *args = e; diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index ce314e6155..1974a3d0cf 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -2,9 +2,6 @@ #include "ks_dht-int.h" #include "sodium.h" -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target) { ks_dht_search_t *s; @@ -15,32 +12,27 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t ks_assert(target); *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t)); - if (!s) { - ret = KS_STATUS_NO_MEM; - goto done; - } + ks_assert(s); + s->pool = pool; - if ((ret = ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool)) != KS_STATUS_SUCCESS) goto done; + ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool); + ks_assert(s->mutex); + memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE); - if ((ret = ks_hash_create(&s->pending, - KS_HASH_MODE_ARBITRARY, - KS_HASH_FLAG_RWLOCK, - s->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_create(&s->pending, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK, s->pool); + ks_assert(s->pending); ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE); - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (s) ks_dht_search_destroy(&s); *search = NULL; } - return KS_STATUS_SUCCESS; + return ret; } -/** - * - */ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search) { ks_dht_search_t *s; @@ -83,7 +75,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool, (void *)search->callbacks, sizeof(ks_dht_search_callback_t) * search->callbacks_size); - if (!search->callbacks) return KS_STATUS_NO_MEM; + ks_assert(search->callbacks); search->callbacks[index] = callback; ks_mutex_unlock(search->mutex); } @@ -97,19 +89,16 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p ks_assert(pending); ks_assert(pool); - - *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t)); - if (!p) { - ret = KS_STATUS_NO_MEM; - goto done; - } - p->pool = pool; + *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t)); + ks_assert(p); + + p->pool = pool; p->nodeid = *nodeid; - p->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION; + p->expiration = ks_time_now() + (KS_DHT_SEARCH_EXPIRATION * 1000); p->finished = KS_FALSE; - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (p) ks_dht_search_pending_destroy(&p); *pending = NULL; diff --git a/libs/libks/src/dht/ks_dht_storageitem.c b/libs/libks/src/dht/ks_dht_storageitem.c index 58c3018a6a..d395e49540 100644 --- a/libs/libks/src/dht/ks_dht_storageitem.c +++ b/libs/libks/src/dht/ks_dht_storageitem.c @@ -2,9 +2,6 @@ #include "ks_dht-int.h" #include "sodium.h" -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v) { ks_dht_storageitem_t *si; @@ -19,27 +16,21 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); - if (!si) { - ret = KS_STATUS_NO_MEM; - goto done; - } - si->pool = pool; + ks_assert(si); + si->pool = pool; si->mutable = KS_FALSE; - si->v = ben_clone(v); - if (!si->v) { - ret = KS_STATUS_NO_MEM; - goto done; - } + ks_assert(si->v); enc = ben_encode(&enc_len, si->v); + ks_assert(enc); SHA1_Init(&sha); SHA1_Update(&sha, enc, enc_len); SHA1_Final(si->id.id, &sha); free(enc); - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (si) ks_dht_storageitem_destroy(&si); *item = NULL; @@ -70,15 +61,12 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t * ks_assert(signature); *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); - if (!si) { - ret = KS_STATUS_NO_MEM; - goto done; - } + ks_assert(si); + si->pool = pool; - - si->v = ben_clone(v); - si->mutable = KS_TRUE; + si->v = ben_clone(v); + ks_assert(si->v); memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE); if (salt && salt_length > 0) { @@ -93,7 +81,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t * if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length); SHA1_Final(si->id.id, &sha); - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (si) ks_dht_storageitem_destroy(&si); *item = NULL; diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c index 18978bf689..0912fa7589 100644 --- a/libs/libks/src/dht/ks_dht_transaction.c +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -15,18 +15,15 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac ks_assert(raddr); *transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t)); - if (!t) { - ret = KS_STATUS_NO_MEM; - goto done; - } - t->pool = pool; + ks_assert(t); + t->pool = pool; t->raddr = *raddr; t->transactionid = transactionid; t->callback = callback; - t->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY; + t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000); - done: + // done: if (ret != KS_STATUS_SUCCESS) { if (t) ks_dht_transaction_destroy(&t); *transaction = NULL;