FS-9775: Add sharelock api for dht nodes

This commit is contained in:
colm 2016-12-21 12:44:22 -05:00 committed by Mike Jerris
parent 64a44ed3a5
commit 59608400d1
3 changed files with 109 additions and 16 deletions

View File

@ -443,6 +443,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t* table
KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t* table, ks_dhtrt_querynodes_t* query);
KS_DECLARE(ks_dht_node_t*) ks_dhtrt_find_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t id);
KS_DECLARE(ks_status_t) ks_dhtrt_sharelock_node(ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node);
KS_DECLARE(ks_status_t) ks_dhtrt_release_querynodes(ks_dhtrt_querynodes_t* query);

View File

@ -44,7 +44,7 @@
#define KS_DHTRT_MAXPING 3
#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)
#define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120)
#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100
#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 0
/* peer flags */
#define DHTPEER_DUBIOUS 0
@ -795,6 +795,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node)
return ks_rwl_read_unlock(node->reflock);
}
KS_DECLARE(ks_status_t) ks_dhtrt_sharelock_node(ks_dht_node_t* node)
{
return ks_rwl_read_lock(node->reflock);
}
KS_DECLARE(ks_status_t) ks_dhtrt_release_querynodes(ks_dhtrt_querynodes_t *query)
{
for(int ix=0; ix<query->count; ++ix) {
@ -959,10 +964,12 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
/* reclaim excess memory */
printf("%d %d %p\n", internal->deleted_count, KS_DHTRT_RECYCLE_NODE_THRESHOLD, (void*)deleted); fflush(stdout);
while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
ks_dht_node_t* node = deleted->node;
#ifdef KS_DHT_DEBUGPRINTFX__
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n");
#endif
@ -973,7 +980,7 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
deleted = deleted->next;
ks_pool_free(table->pool, &temp);
--internal->deleted_count;
#ifdef KS_DHT_DEBUGPRINTFX_
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count %d\n", internal->deleted_count);
#endif
if (prev != NULL) {
@ -985,7 +992,7 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
}
else {
#ifdef KS_DHT_DEBUGPRINTFX__
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n");
#endif
prev = deleted;

View File

@ -348,14 +348,82 @@ void test05()
return;
}
/* test06 */
/* ------- */
ks_dht_nodeid_t g_nodeid1;
ks_dht_nodeid_t g_nodeid2;
ks_dht_node_t* g_peer;
static void *testnodelocking_ex1(ks_thread_t *thread, void *data)
{
ks_dhtrt_release_node(g_peer); //lock=1
ks_dhtrt_release_node(g_peer); //lock=0
return NULL;
}
static void *testnodelocking_ex2(ks_thread_t *thread, void *data)
{
ks_dht_node_t* peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4
ks_dhtrt_release_node(peer2); //lock=3
ks_dhtrt_release_node(peer2); //lock=2
return NULL;
}
void test06()
{
printf("*** testbuckets - test06 start\n"); fflush(stdout);
ks_dht_node_t* peer;
memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE);
memset(g_nodeid2.id, 0x1f, KS_DHT_NODEID_SIZE);
char ipv6[] = "1234:1234:1234:1234";
char ipv4[] = "123.123.123.123";
unsigned short port = 7000;
ks_dhtrt_create_node(rt, g_nodeid1, KS_DHT_REMOTE, ipv4, port, &peer); // lock=1
ks_dhtrt_touch_node(rt, g_nodeid1);
ks_dht_node_t* peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=2
peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=3
peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4
ks_dhtrt_release_node(peer2); //lock=3
g_peer = peer2;
ks_thread_t* t0;
ks_thread_create(&t0, testnodelocking_ex1, NULL, pool);
ks_thread_t* t1;
ks_thread_create(&t1, testnodelocking_ex2, NULL, pool);
ks_thread_join(t1);
ks_thread_join(t0);
ks_dhtrt_delete_node(rt, peer2);
printf("\n\n*** testbuckets - test06 -- check if the node gets deleted\n\n\n\n"); fflush(stdout);
ks_dhtrt_process_table(rt);
printf("*** testbuckets - test06 start\n"); fflush(stdout);
return;
}
static int gindex = 1;
static ks_mutex_t *glock;
static int gstop = 0;
static int test06loops = 1000;
static int test06nodes = 200; /* max at 255 */
static int test60loops = 1000;
static int test60nodes = 200; /* max at 255 */
static void *test06ex1(ks_thread_t *thread, void *data)
static void *test60ex1(ks_thread_t *thread, void *data)
{
while(!gstop) {
ks_dhtrt_process_table(rt);
@ -365,7 +433,7 @@ static void *test06ex1(ks_thread_t *thread, void *data)
}
static void *test06ex2(ks_thread_t *thread, void *data)
static void *test60ex2(ks_thread_t *thread, void *data)
{
ks_dht_nodeid_t nodeid;
ks_dhtrt_querynodes_t query;
@ -393,7 +461,7 @@ static void *test06ex2(ks_thread_t *thread, void *data)
return NULL;
}
static void *test06ex(ks_thread_t *thread, void *data)
static void *test60ex(ks_thread_t *thread, void *data)
{
ks_dht_node_t* peer;
ks_dht_nodeid_t nodeid;
@ -411,16 +479,16 @@ static void *test06ex(ks_thread_t *thread, void *data)
printf("starting thread with i of %d\n", gindex); fflush(stdout);
for(int loop=0; loop<test06loops; ++loop) {
for(int loop=0; loop<test60loops; ++loop) {
for (int i=0; i<test06nodes; ++i) {
for (int i=0; i<test60nodes; ++i) {
++nodeid.id[19];
ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
ks_sleep(1000);
ks_dhtrt_touch_node(rt, nodeid);
}
for (int i=0; i<test06nodes; ++i) {
for (int i=0; i<test60nodes; ++i) {
peer = ks_dhtrt_find_node(rt, nodeid);
if (peer) {
ks_dhtrt_delete_node(rt, peer);
@ -435,19 +503,20 @@ static void *test06ex(ks_thread_t *thread, void *data)
}
void test06()
void test60()
{
printf("**** test60: starting\n"); fflush(stdout);
int i;
ks_mutex_create(&glock, KS_MUTEX_FLAG_DEFAULT, pool);
ks_thread_t* t0;
ks_thread_create(&t0, test06ex1, NULL, pool);
ks_thread_create(&t0, test60ex1, NULL, pool);
ks_thread_t* t1;
ks_thread_create(&t1, test06ex2, NULL, pool);
ks_thread_create(&t1, test60ex2, NULL, pool);
for(i = 0; i < 10; i++) {
ks_thread_create(&threads[i], test06ex, &i, pool);
ks_thread_create(&threads[i], test60ex, &i, pool);
}
printf("all threads started\n"); fflush(stdout);
@ -463,6 +532,8 @@ void test06()
printf("all threads completed\n"); fflush(stdout);
ks_dhtrt_dump(rt, 7);
printf("**** test60: completed\n"); fflush(stdout);
return;
}
@ -683,6 +754,7 @@ void test51()
}
int main(int argc, char* argv[]) {
printf("testdhtbuckets - start\n");
@ -799,6 +871,19 @@ int main(int argc, char* argv[]) {
continue;
}
if (tests[tix] == 60) {
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test60();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 99) {
ks_dhtrt_initroute(&rt, dht, pool, tpool);
//testnodelocking();
ks_dhtrt_deinitroute(&rt);
continue;
}
}