diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index d757845b6b..a3743aa363 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -302,6 +302,12 @@ KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connec if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc); } +KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc) +{ + ks_assert(bc); + return bc->state; +} + KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc) { ks_assert(bc); @@ -393,12 +399,10 @@ ks_status_t blade_connection_state_on_disconnect(blade_connection_t *bc) ks_assert(bc); - blade_handle_connections_remove(bc); - callback = blade_connection_state_callback_lookup(bc, BLADE_CONNECTION_STATE_DISCONNECT); if (callback) callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST); - blade_connection_destroy(&bc); + blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CLEANUP); return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 536d52ef26..9af67c4a22 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -47,6 +47,9 @@ struct blade_handle_s { config_setting_t *config_directory; config_setting_t *config_datastore; + ks_thread_t *worker_thread; + ks_bool_t shutdown; + ks_hash_t *transports; // registered transports exposed by modules, NOT active connections ks_hash_t *spaces; // registered method spaces exposed by modules // registered event callback registry @@ -68,6 +71,7 @@ struct blade_handle_s { ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id }; +void *blade_handle_worker_thread(ks_thread_t *thread, void *data); typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t; struct blade_handle_transport_registration_s { @@ -300,6 +304,17 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ // @todo call onload and onstartup callbacks for modules from DSOs + if (ks_thread_create_ex(&bh->worker_thread, + blade_handle_worker_thread, + bh, + KS_THREAD_FLAG_DEFAULT, + KS_THREAD_DEFAULT_STACK, + KS_PRI_NORMAL, + bh->pool) != KS_STATUS_SUCCESS) { + // @todo error logging + return KS_STATUS_FAIL; + } + return KS_STATUS_SUCCESS; } @@ -309,6 +324,13 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) ks_assert(bh); + if (bh->worker_thread) { + bh->shutdown = KS_TRUE; + ks_thread_join(bh->worker_thread); + ks_pool_free(bh->pool, &bh->worker_thread); + bh->shutdown = KS_FALSE; + } + while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) { void *key = NULL; blade_request_t *value = NULL; @@ -863,6 +885,43 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata); } +void *blade_handle_worker_thread(ks_thread_t *thread, void *data) +{ + blade_handle_t *bh = NULL; + blade_connection_t *bc = NULL; + ks_hash_iterator_t *it = NULL; + ks_q_t *cleanup = NULL; + + ks_assert(thread); + ks_assert(data); + + bh = (blade_handle_t *)data; + + ks_q_create(&cleanup, bh->pool, 0); + ks_assert(cleanup); + + while (!bh->shutdown) { + ks_hash_write_lock(bh->connections); + for (it = ks_hash_first(bh->connections, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + void *key = NULL; + blade_connection_t *value = NULL; + + ks_hash_this(it, (const void **)&key, NULL, (void **)&value); + + if (blade_connection_state_get(value) == BLADE_CONNECTION_STATE_CLEANUP) ks_q_push(cleanup, value); + } + ks_hash_write_unlock(bh->connections); + + while (ks_q_trypop(cleanup, (void **)&bc) == KS_STATUS_SUCCESS) { + blade_handle_connections_remove(bc); + blade_connection_destroy(&bc); + } + + ks_sleep_ms(500); + } + + return NULL; +} /* For Emacs: * Local Variables: diff --git a/libs/libblade/src/include/blade_connection.h b/libs/libblade/src/include/blade_connection.h index 8b8660f9cb..6f580828d1 100644 --- a/libs/libblade/src/include/blade_connection.h +++ b/libs/libblade/src/include/blade_connection.h @@ -54,6 +54,7 @@ KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc); KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc); KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data); KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state); +KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc); KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc); KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target); KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json); diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index e78c6dad0f..71326f0b3a 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -65,6 +65,7 @@ typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, co typedef enum { BLADE_CONNECTION_STATE_NONE, + BLADE_CONNECTION_STATE_CLEANUP, BLADE_CONNECTION_STATE_DISCONNECT, BLADE_CONNECTION_STATE_NEW, BLADE_CONNECTION_STATE_CONNECT,