FS-10167 fix issues in the thread code

This commit is contained in:
Anthony Minessale 2017-03-27 13:01:38 -05:00
parent e1d596d0c9
commit 52f1451ece
5 changed files with 49 additions and 24 deletions

View File

@ -53,6 +53,16 @@ typedef
#endif #endif
ks_pid_t; ks_pid_t;
typedef enum {
KS_THREAD_INIT,
KS_THREAD_RUNNING,
KS_THREAD_FAIL,
KS_THREAD_SHUTDOWN,
KS_THREAD_STOPPED
} ks_thread_state_t;
#define KS_THREAD_IS_RUNNING(_thread) _thread->state == KS_THREAD_RUNNING
struct ks_thread { struct ks_thread {
ks_pool_t *pool; ks_pool_t *pool;
#ifdef WIN32 #ifdef WIN32
@ -65,7 +75,7 @@ struct ks_thread {
ks_thread_function_t function; ks_thread_function_t function;
size_t stack_size; size_t stack_size;
uint32_t flags; uint32_t flags;
uint8_t running; ks_thread_state_t state;
uint8_t priority; uint8_t priority;
void *return_data; void *return_data;
}; };
@ -79,7 +89,7 @@ struct ks_thread {
typedef enum { typedef enum {
KS_THREAD_FLAG_DEFAULT = 0, KS_THREAD_FLAG_DEFAULT = 0,
KS_THREAD_FLAG_DETATCHED = (1 << 0) KS_THREAD_FLAG_DETACHED = (1 << 0)
} ks_thread_flags_t; } ks_thread_flags_t;
KS_DECLARE(int) ks_thread_set_priority(int nice_val); KS_DECLARE(int) ks_thread_set_priority(int nice_val);

View File

@ -85,18 +85,23 @@ static void ks_thread_cleanup(ks_pool_t *mpool, void *ptr, void *arg, int type,
switch(action) { switch(action) {
case KS_MPCL_ANNOUNCE: case KS_MPCL_ANNOUNCE:
thread->running = 0; if (thread->state == KS_THREAD_RUNNING) {
thread->state = KS_THREAD_SHUTDOWN;
}
break; break;
case KS_MPCL_TEARDOWN: case KS_MPCL_TEARDOWN:
if (!(thread->flags & KS_THREAD_FLAG_DETATCHED)) { while(thread->state == KS_THREAD_SHUTDOWN) {
ks_sleep(10000);
}
if (!(thread->flags & KS_THREAD_FLAG_DETACHED)) {
ks_thread_join(thread); ks_thread_join(thread);
} }
break; break;
case KS_MPCL_DESTROY: case KS_MPCL_DESTROY:
#ifdef WIN32 #ifdef WIN32
//if (!(thread->flags & KS_THREAD_FLAG_DETATCHED)) {
CloseHandle(thread->handle); CloseHandle(thread->handle);
//}
#endif #endif
break; break;
} }
@ -118,8 +123,9 @@ static void *KS_THREAD_CALLING_CONVENTION thread_launch(void *args)
pthread_setschedparam(tt, policy, &param); pthread_setschedparam(tt, policy, &param);
} }
#endif #endif
thread->state = KS_THREAD_RUNNING;
thread->return_data = thread->function(thread, thread->private_data); thread->return_data = thread->function(thread, thread->private_data);
thread->state = KS_THREAD_STOPPED;
#ifndef WIN32 #ifndef WIN32
pthread_attr_destroy(&thread->attribute); pthread_attr_destroy(&thread->attribute);
#endif #endif
@ -205,6 +211,7 @@ KS_DECLARE(ks_status_t) ks_thread_create_ex(ks_thread_t **rthread, ks_thread_fun
{ {
ks_thread_t *thread = NULL; ks_thread_t *thread = NULL;
ks_status_t status = KS_STATUS_FAIL; ks_status_t status = KS_STATUS_FAIL;
int sanity = 1000;
if (!rthread) goto done; if (!rthread) goto done;
@ -219,7 +226,6 @@ KS_DECLARE(ks_status_t) ks_thread_create_ex(ks_thread_t **rthread, ks_thread_fun
thread->private_data = data; thread->private_data = data;
thread->function = func; thread->function = func;
thread->stack_size = stack_size; thread->stack_size = stack_size;
thread->running = 1;
thread->flags = flags; thread->flags = flags;
thread->priority = priority; thread->priority = priority;
thread->pool = pool; thread->pool = pool;
@ -241,7 +247,7 @@ KS_DECLARE(ks_status_t) ks_thread_create_ex(ks_thread_t **rthread, ks_thread_fun
SetThreadPriority(thread->handle, THREAD_PRIORITY_LOWEST); SetThreadPriority(thread->handle, THREAD_PRIORITY_LOWEST);
} }
if (flags & KS_THREAD_FLAG_DETATCHED) { if (flags & KS_THREAD_FLAG_DETACHED) {
//CloseHandle(thread->handle); //CloseHandle(thread->handle);
} }
@ -252,7 +258,7 @@ KS_DECLARE(ks_status_t) ks_thread_create_ex(ks_thread_t **rthread, ks_thread_fun
if (pthread_attr_init(&thread->attribute) != 0) if (pthread_attr_init(&thread->attribute) != 0)
goto fail; goto fail;
if ((flags & KS_THREAD_FLAG_DETATCHED) && pthread_attr_setdetachstate(&thread->attribute, PTHREAD_CREATE_DETACHED) != 0) if ((flags & KS_THREAD_FLAG_DETACHED) && pthread_attr_setdetachstate(&thread->attribute, PTHREAD_CREATE_DETACHED) != 0)
goto failpthread; goto failpthread;
if (thread->stack_size && pthread_attr_setstacksize(&thread->attribute, thread->stack_size) != 0) if (thread->stack_size && pthread_attr_setstacksize(&thread->attribute, thread->stack_size) != 0)
@ -265,19 +271,28 @@ KS_DECLARE(ks_status_t) ks_thread_create_ex(ks_thread_t **rthread, ks_thread_fun
goto done; goto done;
failpthread: failpthread:
thread->state = KS_THREAD_FAIL;
pthread_attr_destroy(&thread->attribute); pthread_attr_destroy(&thread->attribute);
#endif #endif
fail: fail:
if (thread) { if (thread) {
thread->running = 0; thread->state = KS_THREAD_FAIL;
if (pool) { if (pool) {
ks_pool_free(pool, &thread); ks_pool_free(pool, &thread);
} }
} }
done: done:
if (status == KS_STATUS_SUCCESS) { if (status == KS_STATUS_SUCCESS) {
while(thread->state < KS_THREAD_RUNNING && --sanity > 0) {
ks_sleep(1000);
}
if (!sanity) {
status = KS_STATUS_FAIL;
goto fail;
}
*rthread = thread; *rthread = thread;
ks_pool_set_cleanup(pool, thread, NULL, 0, ks_thread_cleanup); ks_pool_set_cleanup(pool, thread, NULL, 0, ks_thread_cleanup);
} }

View File

@ -94,7 +94,7 @@ static int check_queue(ks_thread_pool_t *tp, ks_bool_t adding)
ks_mutex_unlock(tp->mutex); ks_mutex_unlock(tp->mutex);
while(need > 0) { while(need > 0) {
if (ks_thread_create_ex(&thread, worker_thread, tp, KS_THREAD_FLAG_DETATCHED, tp->stack_size, tp->priority, tp->pool) != KS_STATUS_SUCCESS) { if (ks_thread_create_ex(&thread, worker_thread, tp, KS_THREAD_FLAG_DETACHED, tp->stack_size, tp->priority, tp->pool) != KS_STATUS_SUCCESS) {
ks_mutex_lock(tp->mutex); ks_mutex_lock(tp->mutex);
tp->thread_count--; tp->thread_count--;
ks_mutex_unlock(tp->mutex); ks_mutex_unlock(tp->mutex);

View File

@ -49,14 +49,14 @@ static void *test2_thread(ks_thread_t *thread, void *data)
ks_hash_iterator_t *itt; ks_hash_iterator_t *itt;
ks_hash_t *hash = (ks_hash_t *) data; ks_hash_t *hash = (ks_hash_t *) data;
while(thread->running) { while(KS_THREAD_IS_RUNNING(thread)) {
for (itt = ks_hash_first(hash, KS_READLOCKED); itt; itt = ks_hash_next(&itt)) { for (itt = ks_hash_first(hash, KS_READLOCKED); itt; itt = ks_hash_next(&itt)) {
const void *key; const void *key;
void *val; void *val;
ks_hash_this(itt, &key, NULL, &val); ks_hash_this(itt, &key, NULL, &val);
printf("%d ITT %s=%s\n", (int)ks_thread_self_id(), (char *)key, (char *)val); printf("%u ITT %s=%s\n", (int)ks_thread_self_id(), (char *)key, (char *)val);
} }
ks_sleep(100000); ks_sleep(100000);
} }
@ -109,7 +109,7 @@ int test2(void)
} }
for (i = 0; i < ttl; i++) { for (i = 0; i < ttl; i++) {
threads[i]->running = 0; threads[i]->state = KS_THREAD_SHUTDOWN;
ks_thread_join(threads[i]); ks_thread_join(threads[i]);
} }

View File

@ -23,8 +23,8 @@ static int cpu_count = 0;
static void *thread_priority(ks_thread_t *thread, void *data) static void *thread_priority(ks_thread_t *thread, void *data)
{ {
while (thread->running) { while (KS_THREAD_IS_RUNNING(thread)) {
ks_sleep(1000000); ks_sleep(100000);
} }
return NULL; return NULL;
@ -145,8 +145,8 @@ static void *thread_test_function_cleanup(ks_thread_t *thread, void *data)
{ {
int d = (int)(intptr_t)data; int d = (int)(intptr_t)data;
while (thread->running) { while (KS_THREAD_IS_RUNNING(thread)) {
ks_sleep(1000000); ks_sleep(100000);
} }
if ( d == 1 ) { if ( d == 1 ) {
@ -229,7 +229,7 @@ static void create_threads_detatched(void)
int i; int i;
for(i = 0; i < cpu_count; i++) { for(i = 0; i < cpu_count; i++) {
status = ks_thread_create_ex(&threads[i], thread_test_function_detatched, d, KS_THREAD_FLAG_DETATCHED, KS_THREAD_DEFAULT_STACK, KS_PRI_NORMAL, pool); status = ks_thread_create_ex(&threads[i], thread_test_function_detatched, d, KS_THREAD_FLAG_DETACHED, KS_THREAD_DEFAULT_STACK, KS_PRI_NORMAL, pool);
ok( status == KS_STATUS_SUCCESS ); ok( status == KS_STATUS_SUCCESS );
} }
} }
@ -239,9 +239,9 @@ static void check_thread_priority(void)
ks_status_t status; ks_status_t status;
void *d = (void *)(intptr_t)1; void *d = (void *)(intptr_t)1;
status = ks_thread_create_ex(&thread_p, thread_priority, d, KS_THREAD_FLAG_DETATCHED, KS_THREAD_DEFAULT_STACK, KS_PRI_IMPORTANT, pool); status = ks_thread_create_ex(&thread_p, thread_priority, d, KS_THREAD_FLAG_DETACHED, KS_THREAD_DEFAULT_STACK, KS_PRI_IMPORTANT, pool);
ok( status == KS_STATUS_SUCCESS ); ok( status == KS_STATUS_SUCCESS );
ks_sleep(1000000); ks_sleep(100000);
todo("Add check to see if has permission to set thread priority\n"); todo("Add check to see if has permission to set thread priority\n");
ok( ks_thread_priority(thread_p) == KS_PRI_IMPORTANT ); ok( ks_thread_priority(thread_p) == KS_PRI_IMPORTANT );
end_todo; end_todo;