FS-9775: Implement deinit dht routetable

This commit is contained in:
colm 2016-12-21 18:43:22 -05:00 committed by Mike Jerris
parent 59608400d1
commit eac02b764b
2 changed files with 120 additions and 27 deletions

View File

@ -44,7 +44,7 @@
#define KS_DHTRT_MAXPING 3
#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)
#define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120)
#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 0
#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100
/* peer flags */
#define DHTPEER_DUBIOUS 0
@ -157,7 +157,7 @@ int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask);
static
void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* node);
static
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table);
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all);
static
ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table);
@ -195,7 +195,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry);
/* very verbose */
/* # define KS_DHT_DEBUGPRINTFX_ */
/* debug locking */
#define KS_DHT_DEBUGLOCKPRINTF_
/* #define KS_DHT_DEBUGLOCKPRINTF_ */
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
@ -231,13 +231,52 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table)
KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **tableP)
{
/* @todo*/
ks_dhtrt_routetable_t* table = *tableP;
ks_pool_t *pool = (*table)->pool;
if (!table || !table->internal) {
return;
}
ks_pool_free(pool, &(*table));
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_write_lock(internal->lock); /* grab write lock */
ks_dhtrt_process_deleted(table, 1);
table->internal = NULL; /* make sure no other threads get in */
ks_pool_t *pool = table->pool;
ks_dhtrt_bucket_header_t *header = internal->buckets;
ks_dhtrt_bucket_header_t *last_header;
ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
int stackix=0;
while (header) {
stack[stackix++] = header;
if (header->bucket) {
ks_pool_free(pool, &header->bucket);
header->bucket = NULL;
}
last_header = header;
header = header->left;
if (header == 0 && stackix > 1) {
stackix -= 2;
header = stack[stackix];
header = header->right;
#ifdef KS_DHT_DEBUGLOCKPRINTFX_
char buf[100];
ks_log(KS_LOG_DEBUG,"deinit: freeing bucket header %s\n", ks_dhtrt_printableid(last_header->mask, buf));
#endif
ks_pool_free(pool, &last_header);
}
}
ks_pool_free(pool, &internal);
ks_pool_free(pool, &table);
*tableP = NULL;
return;
}
@ -249,8 +288,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
unsigned short port,
ks_dht_node_t **node)
{
if (!table || !table->internal) {
return KS_STATUS_FAIL;
}
ks_dht_node_t *tnode;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab write lock and insert */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
@ -306,8 +350,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
if (!table || !table->internal) {
return KS_STATUS_FAIL;
}
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
@ -339,8 +388,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
if (!table || !table->internal) {
return KS_STATUS_FAIL;
}
ks_dhtrt_internal_t* internal = table->internal;
ks_dhtrt_bucket_t *bucket = 0;
int insanity = 0;
ks_rwl_write_lock(internal->lock);
@ -465,10 +519,13 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
if (!table || !table->internal) {
return NULL;
}
ks_dht_node_t* node = NULL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
@ -504,8 +561,13 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
if (!table || !table->internal) {
return KS_STATUS_FAIL;
}
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
@ -542,8 +604,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh
KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
if (!table || !table->internal) {
return KS_STATUS_FAIL;
}
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t *internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
@ -563,8 +630,14 @@ KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dh
KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
{
if (!table || !table->internal) {
return KS_STATUS_FAIL;
query->count = 0;
}
uint8_t count = 0;
ks_dhtrt_internal_t *internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
count = ks_dhtrt_findclosest_locked_nodes(table, query);
ks_rwl_read_unlock(internal->lock); /* release read lock */
@ -822,6 +895,10 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
/* inactive again it is considered inactive */
/* */
if (!table || !table->internal) {
return;
}
ks_dhtrt_internal_t *internal = table->internal;
int ping_count = 0;
@ -875,8 +952,9 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
if ( e->flags != DHTPEER_EXPIRED &&
e->outstanding_pings >= KS_DHTRT_MAXPING ) {
#ifdef KS_DHT_DEBUGPRINTF_
char buf1[100];
ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
ks_dhtrt_printableid(e->id, buf));
ks_dhtrt_printableid(e->id, buf1));
#endif
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
@ -912,8 +990,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
} /* end for each bucket_entry */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf1[100];
ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
char buf[100];
ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
#endif
ks_rwl_write_unlock(b->lock);
@ -921,8 +999,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
} /* end of if trywrite_lock successful */
else {
#ifdef KS_DHT_DEBUGPRINTF_
char buf2[100];
ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2));
char buf1[100];
ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
#endif
}
}
@ -937,7 +1015,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
ks_dhtrt_process_deleted(table);
ks_dhtrt_process_deleted(table, 0);
if (ping_count == 0) {
internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
@ -950,7 +1028,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
return;
}
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all)
{
ks_dhtrt_internal_t* internal = table->internal;
ks_mutex_lock(internal->deleted_node_lock);
@ -958,18 +1036,22 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
#ifdef KS_DHT_DEBUGPRINTF_
#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count);
#endif
/* reclaim excess memory */
printf("%d %d %p\n", internal->deleted_count, KS_DHTRT_RECYCLE_NODE_THRESHOLD, (void*)deleted); fflush(stdout);
uint32_t threshold = KS_DHTRT_RECYCLE_NODE_THRESHOLD;
while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
if (all) {
threshold = 1;
}
while(internal->deleted_count > threshold && deleted) {
ks_dht_node_t* node = deleted->node;
#ifdef KS_DHT_DEBUGPRINTF_
#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n");
#endif
@ -992,7 +1074,7 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
}
else {
#ifdef KS_DHT_DEBUGPRINTF_
#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n");
#endif
prev = deleted;

View File

@ -357,6 +357,8 @@ ks_dht_node_t* g_peer;
static void *testnodelocking_ex1(ks_thread_t *thread, void *data)
{
//lock=3 on entry
ks_dhtrt_release_node(g_peer); //lock=2
ks_dhtrt_release_node(g_peer); //lock=1
ks_dhtrt_release_node(g_peer); //lock=0
return NULL;
@ -364,9 +366,17 @@ static void *testnodelocking_ex1(ks_thread_t *thread, void *data)
static void *testnodelocking_ex2(ks_thread_t *thread, void *data)
{
ks_dht_node_t* peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4
// lock=4 on entry
ks_dht_node_t* peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=5
ks_dhtrt_release_node(peer2); //lock=4
ks_dhtrt_sharelock_node(peer2); //lock=5
ks_dhtrt_release_node(peer2); //lock=4
ks_dhtrt_sharelock_node(peer2); //lock=5
ks_dhtrt_release_node(peer2); //lock=4
ks_dhtrt_release_node(peer2); //lock=3
ks_dhtrt_release_node(peer2); //lock=2
ks_dhtrt_find_node(rt, g_nodeid1); //lock=4
ks_dhtrt_release_node(peer2); //lock=3
return NULL;
}
@ -391,6 +401,7 @@ void test06()
peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4
ks_dhtrt_release_node(peer2); //lock=3
ks_dhtrt_sharelock_node(peer2); //lock=4
g_peer = peer2;