FS-10167: Fixed an issue with connection cleanup, the same approach should be taken with sessions to avoid any potential thread deadlock due to cleanup from the same thread which is running the session.
This commit is contained in:
parent
a5dc3e8f1f
commit
dd6031544a
|
@ -302,6 +302,12 @@ KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connec
|
||||||
if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
|
if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc)
|
||||||
|
{
|
||||||
|
ks_assert(bc);
|
||||||
|
return bc->state;
|
||||||
|
}
|
||||||
|
|
||||||
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
|
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
|
||||||
{
|
{
|
||||||
ks_assert(bc);
|
ks_assert(bc);
|
||||||
|
@ -393,12 +399,10 @@ ks_status_t blade_connection_state_on_disconnect(blade_connection_t *bc)
|
||||||
|
|
||||||
ks_assert(bc);
|
ks_assert(bc);
|
||||||
|
|
||||||
blade_handle_connections_remove(bc);
|
|
||||||
|
|
||||||
callback = blade_connection_state_callback_lookup(bc, BLADE_CONNECTION_STATE_DISCONNECT);
|
callback = blade_connection_state_callback_lookup(bc, BLADE_CONNECTION_STATE_DISCONNECT);
|
||||||
if (callback) callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
|
if (callback) callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
|
||||||
|
|
||||||
blade_connection_destroy(&bc);
|
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CLEANUP);
|
||||||
|
|
||||||
return KS_STATUS_SUCCESS;
|
return KS_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,9 @@ struct blade_handle_s {
|
||||||
config_setting_t *config_directory;
|
config_setting_t *config_directory;
|
||||||
config_setting_t *config_datastore;
|
config_setting_t *config_datastore;
|
||||||
|
|
||||||
|
ks_thread_t *worker_thread;
|
||||||
|
ks_bool_t shutdown;
|
||||||
|
|
||||||
ks_hash_t *transports; // registered transports exposed by modules, NOT active connections
|
ks_hash_t *transports; // registered transports exposed by modules, NOT active connections
|
||||||
ks_hash_t *spaces; // registered method spaces exposed by modules
|
ks_hash_t *spaces; // registered method spaces exposed by modules
|
||||||
// registered event callback registry
|
// registered event callback registry
|
||||||
|
@ -68,6 +71,7 @@ struct blade_handle_s {
|
||||||
ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id
|
ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void *blade_handle_worker_thread(ks_thread_t *thread, void *data);
|
||||||
|
|
||||||
typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t;
|
typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t;
|
||||||
struct blade_handle_transport_registration_s {
|
struct blade_handle_transport_registration_s {
|
||||||
|
@ -300,6 +304,17 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
|
||||||
|
|
||||||
// @todo call onload and onstartup callbacks for modules from DSOs
|
// @todo call onload and onstartup callbacks for modules from DSOs
|
||||||
|
|
||||||
|
if (ks_thread_create_ex(&bh->worker_thread,
|
||||||
|
blade_handle_worker_thread,
|
||||||
|
bh,
|
||||||
|
KS_THREAD_FLAG_DEFAULT,
|
||||||
|
KS_THREAD_DEFAULT_STACK,
|
||||||
|
KS_PRI_NORMAL,
|
||||||
|
bh->pool) != KS_STATUS_SUCCESS) {
|
||||||
|
// @todo error logging
|
||||||
|
return KS_STATUS_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
return KS_STATUS_SUCCESS;
|
return KS_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,6 +324,13 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
|
||||||
|
|
||||||
ks_assert(bh);
|
ks_assert(bh);
|
||||||
|
|
||||||
|
if (bh->worker_thread) {
|
||||||
|
bh->shutdown = KS_TRUE;
|
||||||
|
ks_thread_join(bh->worker_thread);
|
||||||
|
ks_pool_free(bh->pool, &bh->worker_thread);
|
||||||
|
bh->shutdown = KS_FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
|
while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
|
||||||
void *key = NULL;
|
void *key = NULL;
|
||||||
blade_request_t *value = NULL;
|
blade_request_t *value = NULL;
|
||||||
|
@ -863,6 +885,43 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
|
||||||
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
|
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *blade_handle_worker_thread(ks_thread_t *thread, void *data)
|
||||||
|
{
|
||||||
|
blade_handle_t *bh = NULL;
|
||||||
|
blade_connection_t *bc = NULL;
|
||||||
|
ks_hash_iterator_t *it = NULL;
|
||||||
|
ks_q_t *cleanup = NULL;
|
||||||
|
|
||||||
|
ks_assert(thread);
|
||||||
|
ks_assert(data);
|
||||||
|
|
||||||
|
bh = (blade_handle_t *)data;
|
||||||
|
|
||||||
|
ks_q_create(&cleanup, bh->pool, 0);
|
||||||
|
ks_assert(cleanup);
|
||||||
|
|
||||||
|
while (!bh->shutdown) {
|
||||||
|
ks_hash_write_lock(bh->connections);
|
||||||
|
for (it = ks_hash_first(bh->connections, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
|
||||||
|
void *key = NULL;
|
||||||
|
blade_connection_t *value = NULL;
|
||||||
|
|
||||||
|
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
|
||||||
|
|
||||||
|
if (blade_connection_state_get(value) == BLADE_CONNECTION_STATE_CLEANUP) ks_q_push(cleanup, value);
|
||||||
|
}
|
||||||
|
ks_hash_write_unlock(bh->connections);
|
||||||
|
|
||||||
|
while (ks_q_trypop(cleanup, (void **)&bc) == KS_STATUS_SUCCESS) {
|
||||||
|
blade_handle_connections_remove(bc);
|
||||||
|
blade_connection_destroy(&bc);
|
||||||
|
}
|
||||||
|
|
||||||
|
ks_sleep_ms(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* For Emacs:
|
/* For Emacs:
|
||||||
* Local Variables:
|
* Local Variables:
|
||||||
|
|
|
@ -54,6 +54,7 @@ KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc);
|
||||||
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc);
|
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc);
|
||||||
KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data);
|
KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data);
|
||||||
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state);
|
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state);
|
||||||
|
KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc);
|
||||||
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
|
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
|
||||||
KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);
|
KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);
|
||||||
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json);
|
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json);
|
||||||
|
|
|
@ -65,6 +65,7 @@ typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, co
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
BLADE_CONNECTION_STATE_NONE,
|
BLADE_CONNECTION_STATE_NONE,
|
||||||
|
BLADE_CONNECTION_STATE_CLEANUP,
|
||||||
BLADE_CONNECTION_STATE_DISCONNECT,
|
BLADE_CONNECTION_STATE_DISCONNECT,
|
||||||
BLADE_CONNECTION_STATE_NEW,
|
BLADE_CONNECTION_STATE_NEW,
|
||||||
BLADE_CONNECTION_STATE_CONNECT,
|
BLADE_CONNECTION_STATE_CONNECT,
|
||||||
|
|
Loading…
Reference in New Issue