diff --git a/libs/libblade/Makefile.am b/libs/libblade/Makefile.am index fe4c96b108..f55b1266bb 100644 --- a/libs/libblade/Makefile.am +++ b/libs/libblade/Makefile.am @@ -11,13 +11,13 @@ libunqlite_la_CFLAGS = -DUNQLITE_ENABLE_THREADS libunqlite_la_LIBADD = -lpthread lib_LTLIBRARIES = libblade.la -libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/bpcp.c src/blade_datastore.c +libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS) -libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm $(AM_LDFLAGS) +libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS) libblade_la_LIBADD = libunqlite.la library_includedir = $(prefix)/include -library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/bpcp.h -library_include_HEADERS += src/include/blade_datastore.h +library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h +library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h library_include_HEADERS += src/include/unqlite.h test/tap.h tests: libblade.la diff --git a/libs/libblade/src/blade_datastore.c b/libs/libblade/src/blade_datastore.c index d97cd9840d..49c3cbddb6 100644 --- a/libs/libblade/src/blade_datastore.c +++ b/libs/libblade/src/blade_datastore.c @@ -37,12 +37,19 @@ typedef enum { BDS_NONE = 0, BDS_MYPOOL = (1 << 0), + BDS_MYTPOOL = (1 << 1), } bdspvt_flag_t; struct blade_datastore_s { bdspvt_flag_t flags; ks_pool_t *pool; + ks_thread_pool_t *tpool; + + const char *config_database_path; + //config_setting_t *config_service; + unqlite *db; + //blade_service_t *service; }; struct blade_datastore_fetch_userdata_s @@ -71,11 +78,10 @@ KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP) flags = bds->flags; pool = bds->pool; - if (bds->db) { - unqlite_close(bds->db); - bds->db = NULL; - } + blade_datastore_shutdown(bds); + if (bds->tpool && (flags & BDS_MYTPOOL)) ks_thread_pool_destroy(&bds->tpool); + ks_pool_free(bds->pool, &bds); if (pool && (flags & BDS_MYPOOL)) ks_pool_close(&pool); @@ -83,7 +89,7 @@ KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP) return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool) +KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool) { bdspvt_flag_t newflags = BDS_NONE; blade_datastore_t *bds = NULL; @@ -93,16 +99,67 @@ KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool ks_pool_open(&pool); ks_assert(pool); } - + // @todo: move thread pool creation to startup which allows thread pool to be configurable + if (!tpool) { + newflags |= BDS_MYTPOOL; + ks_thread_pool_create(&tpool, + BLADE_DATASTORE_TPOOL_MIN, + BLADE_DATASTORE_TPOOL_MAX, + BLADE_DATASTORE_TPOOL_STACK, + KS_PRI_NORMAL, + BLADE_DATASTORE_TPOOL_IDLE); + ks_assert(tpool); + } + bds = ks_pool_alloc(pool, sizeof(*bds)); bds->flags = newflags; bds->pool = pool; + bds->tpool = tpool; *bdsP = bds; - if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) { + return KS_STATUS_SUCCESS; +} + +ks_status_t blade_datastore_config(blade_datastore_t *bds, config_setting_t *config) +{ + config_setting_t *tmp; + config_setting_t *database = NULL; + //config_setting_t *service = NULL; + const char *config_database_path = NULL; + + ks_assert(bds); + + if (!config) return KS_STATUS_FAIL; + if (!config_setting_is_group(config)) return KS_STATUS_FAIL; + + database = config_setting_get_member(config, "database"); + if (!database) return KS_STATUS_FAIL; + tmp = config_lookup_from(database, "path"); + if (!tmp) return KS_STATUS_FAIL; + if (config_setting_type(tmp) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL; + config_database_path = config_setting_get_string(tmp); + //service = config_setting_get_member(config, "service"); + + if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path); + bds->config_database_path = ks_pstrdup(bds->pool, config_database_path); + //bds->config_service = service; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config) +{ + ks_assert(bds); + + // @todo check if already started + + if (blade_datastore_config(bds, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + //if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) { + if (unqlite_open(&bds->db, bds->config_database_path, UNQLITE_OPEN_CREATE) != UNQLITE_OK) { const char *errbuf = NULL; blade_datastore_error(bds, &errbuf, NULL); - ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf); + ks_log(KS_LOG_ERROR, "BDS Open Error: %s\n", errbuf); return KS_STATUS_FAIL; } @@ -110,15 +167,31 @@ KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool // @todo VM init if document store is used (and output consumer callback) + //blade_service_create(&bds->service, bds->pool, bds->tpool); + //ks_assert(bds->service); + //blade_service_startup(bds->service, bds->config_service); + return KS_STATUS_SUCCESS; } -KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout) +KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds) { ks_assert(bds); - ks_assert(timeout >= 0); + + //if (bds->service) blade_service_destroy(&bds->service); + + if (bds->db) { + unqlite_close(bds->db); + bds->db = NULL; + } + + if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path); + //bds->config_service = NULL; + + return KS_STATUS_SUCCESS; } + KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length) { ks_assert(bds); @@ -147,7 +220,7 @@ KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void else { const char *errbuf; blade_datastore_error(bds, &errbuf, NULL); - ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf); + ks_log(KS_LOG_ERROR, "BDS Store Error: %s\n", errbuf); ret = KS_STATUS_FAIL; } @@ -196,10 +269,11 @@ KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds, if (rc != UNQLITE_OK) { if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT; + else if(rc == UNQLITE_NOTFOUND) ret = KS_STATUS_NOT_FOUND; else { const char *errbuf; blade_datastore_error(bds, &errbuf, NULL); - ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf); + ks_log(KS_LOG_ERROR, "BDS Fetch Error: %s\n", errbuf); ret = KS_STATUS_FAIL; } diff --git a/libs/libblade/src/blade_directory.c b/libs/libblade/src/blade_directory.c new file mode 100644 index 0000000000..ab24942dfe --- /dev/null +++ b/libs/libblade/src/blade_directory.c @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2007-2014, Anthony Minessale II + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "blade.h" + + +typedef enum { + BD_NONE = 0, + BD_MYPOOL = (1 << 0), + BD_MYTPOOL = (1 << 1), +} bdpvt_flag_t; + +struct blade_directory_s { + bdpvt_flag_t flags; + ks_pool_t *pool; + ks_thread_pool_t *tpool; + + config_setting_t *config_service; + + blade_service_t *service; +}; + + + + +KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP) +{ + blade_directory_t *bd = NULL; + bdpvt_flag_t flags; + ks_pool_t *pool; + + ks_assert(bdP); + + bd = *bdP; + *bdP = NULL; + + ks_assert(bd); + + flags = bd->flags; + pool = bd->pool; + + blade_directory_shutdown(bd); + + if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool); + + ks_pool_free(bd->pool, &bd); + + if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool) +{ + bdpvt_flag_t newflags = BD_NONE; + blade_directory_t *bd = NULL; + + if (!pool) { + newflags |= BD_MYPOOL; + ks_pool_open(&pool); + ks_assert(pool); + } + // @todo: move thread pool creation to startup which allows thread pool to be configurable + if (!tpool) { + newflags |= BD_MYTPOOL; + ks_thread_pool_create(&tpool, + BLADE_DIRECTORY_TPOOL_MIN, + BLADE_DIRECTORY_TPOOL_MAX, + BLADE_DIRECTORY_TPOOL_STACK, + KS_PRI_NORMAL, + BLADE_DIRECTORY_TPOOL_IDLE); + ks_assert(tpool); + } + + bd = ks_pool_alloc(pool, sizeof(*bd)); + bd->flags = newflags; + bd->pool = pool; + bd->tpool = tpool; + *bdP = bd; + + return KS_STATUS_SUCCESS; +} + +ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config) +{ + config_setting_t *service = NULL; + + ks_assert(bd); + + if (!config) return KS_STATUS_FAIL; + if (!config_setting_is_group(config)) return KS_STATUS_FAIL; + + service = config_setting_get_member(config, "service"); + if (!service) return KS_STATUS_FAIL; + + bd->config_service = service; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config) +{ + ks_assert(bd); + + blade_directory_shutdown(bd); + + if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + blade_service_create(&bd->service, bd->pool, bd->tpool); + ks_assert(bd->service); + if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd) +{ + ks_assert(bd); + + if (bd->service) blade_service_destroy(&bd->service); + + return KS_STATUS_SUCCESS; +} + + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/blade_peer.c b/libs/libblade/src/blade_peer.c index 8bb96ae676..2e794a629b 100644 --- a/libs/libblade/src/blade_peer.c +++ b/libs/libblade/src/blade_peer.c @@ -33,11 +33,6 @@ #include "blade.h" -#define KS_DHT_TPOOL_MIN 2 -#define KS_DHT_TPOOL_MAX 8 -#define KS_DHT_TPOOL_STACK (1024 * 256) -#define KS_DHT_TPOOL_IDLE 10 - typedef enum { BP_NONE = 0, BP_MYPOOL = (1 << 0), @@ -48,10 +43,20 @@ struct blade_peer_s { bppvt_flag_t flags; ks_pool_t *pool; ks_thread_pool_t *tpool; - ks_dht_t *dht; + blade_service_t *service; + + ks_bool_t shutdown; + kws_t *kws; + ks_thread_t *kws_thread; + + ks_q_t *messages_sending; + ks_q_t *messages_receiving; }; +void *blade_peer_kws_thread(ks_thread_t *thread, void *data); + + KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP) { blade_peer_t *bp = NULL; @@ -68,7 +73,11 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP) flags = bp->flags; pool = bp->pool; - if (bp->dht) ks_dht_destroy(&bp->dht); + blade_peer_shutdown(bp); + + ks_q_destroy(&bp->messages_sending); + ks_q_destroy(&bp->messages_receiving); + if (bp->tpool && (flags & BP_MYTPOOL)) ks_thread_pool_destroy(&bp->tpool); ks_pool_free(bp->pool, &bp); @@ -78,11 +87,13 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP) return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid) +KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service) { bppvt_flag_t newflags = BP_NONE; blade_peer_t *bp = NULL; - ks_dht_t *dht = NULL; + + ks_assert(bpP); + ks_assert(service); if (!pool) { newflags |= BP_MYPOOL; @@ -94,50 +105,112 @@ KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, k ks_thread_pool_create(&tpool, BLADE_PEER_TPOOL_MIN, BLADE_PEER_TPOOL_MAX, BLADE_PEER_TPOOL_STACK, KS_PRI_NORMAL, BLADE_PEER_TPOOL_IDLE); ks_assert(tpool); } - ks_dht_create(&dht, pool, tpool, nodeid); - ks_assert(dht); bp = ks_pool_alloc(pool, sizeof(*bp)); bp->flags = newflags; bp->pool = pool; bp->tpool = tpool; - bp->dht = dht; + bp->service = service; + ks_q_create(&bp->messages_sending, pool, 0); + ks_q_create(&bp->messages_receiving, pool, 0); *bpP = bp; return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp) +KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws) { ks_assert(bp); - ks_assert(bp->dht); + ks_assert(kws); - return &bp->dht->nodeid; + // @todo: consider using a recycle queue for blade_peer_t in blade_service_t, just need to call startup then + + blade_peer_shutdown(bp); + + bp->kws = kws; + + if (ks_thread_create_ex(&bp->kws_thread, + blade_peer_kws_thread, + bp, + KS_THREAD_FLAG_DEFAULT, + KS_THREAD_DEFAULT_STACK, + KS_PRI_NORMAL, + bp->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + return KS_STATUS_SUCCESS; } -KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port) +KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp) { ks_assert(bp); - ks_dht_autoroute(bp->dht, autoroute, port); + bp->shutdown = KS_TRUE; + + if (bp->kws_thread) { + ks_thread_join(bp->kws_thread); + ks_pool_free(bp->pool, &bp->kws_thread); + } + + if (bp->kws) kws_destroy(&bp->kws); + + bp->shutdown = KS_FALSE; + return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint) +void *blade_peer_kws_thread(ks_thread_t *thread, void *data) { - ks_assert(bp); - ks_assert(addr); + blade_peer_t *peer; + kws_opcode_t opcode; + uint8_t *data; + ks_size_t data_len; + blade_message_t *message; - return ks_dht_bind(bp->dht, addr, endpoint); + ks_assert(thread); + ks_assert(data); + + peer = (blade_peer_t *)data; + + while (!peer->shutdown) { + // @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again + // or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or + // kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout)) + data_len = kws_read_frame(peer->kws, &opcode, &data); + + if (data_len <= 0) { + // @todo error handling, strerror(ks_errno()) + // 0 means socket closed with WS_NONE, which closes websocket with no additional reason + // -1 means socket closed with a general failure + // -2 means nonblocking wait + // other values are based on WS_XXX reasons + // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and + + // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting, + // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have, + // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback + // at the service level + peer->disconnecting = KS_TRUE; + break; + } + + // @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data + if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) { + // @todo error handling + // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails + peer->disconnecting = KS_TRUE; + break; + } + + ks_q_push(peer->messages_receiving, message); + // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)? + + + if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) { + } + } + + return NULL; } - -KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout) -{ - ks_assert(bp); - ks_assert(timeout >= 0); - - ks_dht_pulse(bp->dht, timeout); -} - + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libblade/src/blade_service.c b/libs/libblade/src/blade_service.c new file mode 100644 index 0000000000..e1361bf9df --- /dev/null +++ b/libs/libblade/src/blade_service.c @@ -0,0 +1,401 @@ +/* + * Copyright (c) 2007-2014, Anthony Minessale II + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "blade.h" + +typedef enum { + BS_NONE = 0, + BS_MYPOOL = (1 << 0), + BS_MYTPOOL = (1 << 1) +} bspvt_flag_t; + +#define BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX 16 + +struct blade_service_s { + bspvt_flag_t flags; + ks_pool_t *pool; + ks_thread_pool_t *tpool; + + ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX]; + ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX]; + int32_t config_websockets_endpoints_ipv4_length; + int32_t config_websockets_endpoints_ipv6_length; + int32_t config_websockets_endpoints_backlog; + + ks_bool_t shutdown; + + struct pollfd *listeners_poll; + int32_t *listeners_families; + int32_t listeners_size; + int32_t listeners_length; + ks_thread_t *listeners_thread; + + list_t connected; +}; + + +void *blade_service_listeners_thread(ks_thread_t *thread, void *data); +ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr); + + +KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP) +{ + blade_service_t *bs = NULL; + bspvt_flag_t flags; + ks_pool_t *pool; + + ks_assert(bsP); + + bs = *bsP; + *bsP = NULL; + + ks_assert(bs); + + flags = bs->flags; + pool = bs->pool; + + blade_service_shutdown(bs); + + list_destroy(&bs->connected); + + if (bs->tpool && (flags & BS_MYTPOOL)) ks_thread_pool_destroy(&bs->tpool); + + ks_pool_free(bs->pool, &bs); + + if (pool && (flags & BS_MYPOOL)) ks_pool_close(&pool); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool) +{ + bspvt_flag_t newflags = BS_NONE; + blade_service_t *bs = NULL; + + if (!pool) { + newflags |= BS_MYPOOL; + ks_pool_open(&pool); + ks_assert(pool); + } + if (!tpool) { + newflags |= BS_MYTPOOL; + ks_thread_pool_create(&tpool, BLADE_SERVICE_TPOOL_MIN, BLADE_SERVICE_TPOOL_MAX, BLADE_SERVICE_TPOOL_STACK, KS_PRI_NORMAL, BLADE_SERVICE_TPOOL_IDLE); + ks_assert(tpool); + } + + bs = ks_pool_alloc(pool, sizeof(*bs)); + bs->flags = newflags; + bs->pool = pool; + bs->tpool = tpool; + list_init(&bs->connected); + *bsP = bs; + + return KS_STATUS_SUCCESS; +} + +ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config) +{ + config_setting_t *websockets = NULL; + config_setting_t *websockets_endpoints = NULL; + config_setting_t *websockets_endpoints_ipv4 = NULL; + config_setting_t *websockets_endpoints_ipv6 = NULL; + config_setting_t *websockets_ssl = NULL; + config_setting_t *element; + config_setting_t *tmp1; + config_setting_t *tmp2; + ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX]; + ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX]; + int32_t config_websockets_endpoints_ipv4_length = 0; + int32_t config_websockets_endpoints_ipv6_length = 0; + int32_t config_websockets_endpoints_backlog = 8; + + ks_assert(bs); + + if (!config) return KS_STATUS_FAIL; + if (!config_setting_is_group(config)) return KS_STATUS_FAIL; + + websockets = config_setting_get_member(config, "websockets"); + if (!websockets) return KS_STATUS_FAIL; + websockets_endpoints = config_setting_get_member(config, "endpoints"); + if (!websockets_endpoints) return KS_STATUS_FAIL; + websockets_endpoints_ipv4 = config_lookup_from(websockets_endpoints, "ipv4"); + websockets_endpoints_ipv6 = config_lookup_from(websockets_endpoints, "ipv6"); + if (websockets_endpoints_ipv4) { + if (config_setting_type(websockets_endpoints_ipv4) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL; + if ((config_websockets_endpoints_ipv4_length = config_setting_length(websockets_endpoints_ipv4)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX) + return KS_STATUS_FAIL; + + for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index) { + element = config_setting_get_elem(websockets_endpoints_ipv4, index); + tmp1 = config_lookup_from(element, "address"); + tmp2 = config_lookup_from(element, "port"); + if (!tmp1 || !tmp2) return KS_STATUS_FAIL; + if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL; + if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL; + + if (ks_addr_set(&config_websockets_endpoints_ipv4[index], + config_setting_get_string(tmp1), + config_setting_get_int(tmp2), + AF_INET) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + } + } + if (websockets_endpoints_ipv6) { + if (config_setting_type(websockets_endpoints_ipv6) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL; + if ((config_websockets_endpoints_ipv6_length = config_setting_length(websockets_endpoints_ipv6)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX) + return KS_STATUS_FAIL; + + for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index) { + element = config_setting_get_elem(websockets_endpoints_ipv6, index); + tmp1 = config_lookup_from(element, "address"); + tmp2 = config_lookup_from(element, "port"); + if (!tmp1 || !tmp2) return KS_STATUS_FAIL; + if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL; + if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL; + + if (ks_addr_set(&config_websockets_endpoints_ipv6[index], + config_setting_get_string(tmp1), + config_setting_get_int(tmp2), + AF_INET6) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + } + } + if (config_websockets_endpoints_ipv4_length + config_websockets_endpoints_ipv6_length <= 0) return KS_STATUS_FAIL; + tmp1 = config_lookup_from(websockets_endpoints, "backlog"); + if (tmp1) { + if (config_setting_type(tmp1) != CONFIG_TYPE_INT) return KS_STATUS_FAIL; + config_websockets_endpoints_backlog = config_setting_get_int(tmp1); + } + websockets_ssl = config_setting_get_member(websockets, "ssl"); + if (websockets_ssl) { + // @todo: SSL stuffs from websockets_ssl into config_websockets_ssl envelope + } + + + // Configuration is valid, now assign it to the variables that are used + // If the configuration was invalid, then this does not get changed from the current config when reloading a new config + for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index) + bs->config_websockets_endpoints_ipv4[index] = config_websockets_endpoints_ipv4[index]; + for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index) + bs->config_websockets_endpoints_ipv6[index] = config_websockets_endpoints_ipv6[index]; + bs->config_websockets_endpoints_ipv4_length = config_websockets_endpoints_ipv4_length; + bs->config_websockets_endpoints_ipv6_length = config_websockets_endpoints_ipv6_length; + bs->config_websockets_endpoints_backlog = config_websockets_endpoints_backlog; + //bs->config_websockets_ssl = config_websockets_ssl; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config) +{ + ks_assert(bs); + + blade_service_shutdown(bs); + + // @todo: If the configuration is invalid, and this is a case of reloading a new config, then the service shutdown shouldn't occur + // but the service may use configuration that changes before we shutdown if it is read successfully, may require a config reader/writer mutex? + + if (blade_service_config(bs, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv4_length; ++index) { + if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv4[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + } + for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv6_length; ++index) { + if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv6[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + } + + if (ks_thread_create_ex(&bs->listeners_thread, + blade_service_listeners_thread, + bs, + KS_THREAD_FLAG_DEFAULT, + KS_THREAD_DEFAULT_STACK, + KS_PRI_NORMAL, + bs->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs) +{ + ks_assert(bs); + + bs->shutdown = KS_TRUE; + + if (bs->listeners_thread) { + ks_thread_join(bs->listeners_thread); + ks_pool_free(bs->pool, &bs->listeners_thread); + } + + for (int32_t index = 0; index < bs->listeners_length; ++index) { + ks_socket_t sock = bs->listeners_poll[index].fd; + ks_socket_shutdown(sock, SHUT_RDWR); + ks_socket_close(&sock); + } + bs->listeners_length = 0; + + list_iterator_start(&bs->connected); + while (list_iterator_hasnext(&bs->connected)) { + blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&bs->connected); + blade_peer_destroy(&peer); + } + list_iterator_stop(&bs->connected); + list_clear(&bs->connected); + + bs->shutdown = KS_FALSE; + return KS_STATUS_SUCCESS; +} + +ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr) +{ + ks_socket_t listener = KS_SOCK_INVALID; + int32_t listener_index = -1; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(bs); + ks_assert(addr); + + if ((listener = socket(addr->family, SOCK_STREAM, IPPROTO_TCP)) == KS_SOCK_INVALID) { + ret = KS_STATUS_FAIL; + goto done; + } + + ks_socket_option(listener, SO_REUSEADDR, KS_TRUE); + ks_socket_option(listener, TCP_NODELAY, KS_TRUE); + // @todo make sure v6 does not automatically map to a v4 using socket option IPV6_V6ONLY? + + if (ks_addr_bind(listener, addr) != KS_STATUS_SUCCESS) { + ret = KS_STATUS_FAIL; + goto done; + } + + if (listen(listener, bs->config_websockets_endpoints_backlog) != 0) { + ret = KS_STATUS_FAIL; + goto done; + } + + listener_index = bs->listeners_length++; + if (bs->listeners_length > bs->listeners_size) { + bs->listeners_size = bs->listeners_length; + bs->listeners_poll = (struct pollfd *)ks_pool_resize(bs->pool, bs->listeners_poll, sizeof(struct pollfd) * bs->listeners_size); + ks_assert(bs->listeners_poll); + bs->listeners_families = (int32_t *)ks_pool_resize(bs->pool, bs->listeners_families, sizeof(int32_t) * bs->listeners_size); + ks_assert(bs->listeners_families); + } + bs->listeners_poll[listener_index].fd = listener; + bs->listeners_poll[listener_index].events = POLLIN | POLLERR; + bs->listeners_families[listener_index] = addr->family; + + done: + if (ret != KS_STATUS_SUCCESS) { + if (listener != KS_SOCK_INVALID) { + ks_socket_shutdown(listener, SHUT_RDWR); + ks_socket_close(&listener); + } + } + return ret; +} + +void *blade_service_listeners_thread(ks_thread_t *thread, void *data) +{ + blade_service_t *service; + + ks_assert(thread); + ks_assert(data); + + service = (blade_service_t *)data; + + while (!service->shutdown) { + if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) { + for (int32_t index = 0; index < service->listeners_length; ++index) { + ks_socket_t sock; + ks_sockaddr_t raddr; + socklen_t slen = 0; + kws_t *kws = NULL; + blade_peer_t *peer = NULL; + + if (!(service->listeners_poll[index].revents & POLLIN)) continue; + if (service->listeners_poll[index].revents & POLLERR) { + // @todo: error handling, just skip the listener for now + continue; + } + + if (service->listeners_families[index] == AF_INET) { + slen = sizeof(raddr.v.v4); + if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) { + // @todo: error handling, just skip the socket for now + continue; + } + raddr.family = AF_INET; + } else { + slen = sizeof(raddr.v.v6); + if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) { + // @todo: error handling, just skip the socket for now + continue; + } + raddr.family = AF_INET6; + } + + ks_addr_get_host(&raddr); + ks_addr_get_port(&raddr); + + // @todo: SSL init stuffs based on data from service->config_websockets_ssl + + if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) { + // @todo: error handling, just close and skip the socket for now + ks_socket_close(&sock); + continue; + } + + blade_peer_create(&peer, service->pool, service->tpool); + ks_assert(peer); + + // @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread + + list_append(&service->connected, peer); + + blade_peer_startup(peer, kws); + } + } + } + + return NULL; +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index d7cf76235c..5f7c8a7d73 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -43,7 +43,12 @@ struct blade_handle_s { bhpvt_flag_t flags; ks_pool_t *pool; ks_thread_pool_t *tpool; - blade_peer_t *peer; + + config_setting_t *config_datastore; + config_setting_t *config_directory; + + //blade_peer_t *peer; + blade_directory_t *directory; blade_datastore_t *datastore; }; @@ -64,9 +69,9 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP) flags = bh->flags; pool = bh->pool; - if (bh->datastore) blade_datastore_destroy(&bh->datastore); - - blade_peer_destroy(&bh->peer); + blade_handle_shutdown(bh); + + //blade_peer_destroy(&bh->peer); if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool); ks_pool_free(bh->pool, &bh); @@ -78,14 +83,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP) return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid) +KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool) { bhpvt_flag_t newflags = BH_NONE; blade_handle_t *bh = NULL; - ks_dht_nodeid_t nid; - ks_assert(nodeid); - ks_assert(strlen(nodeid) == (KS_DHT_NODEID_SIZE * 2)); + ks_assert(bhP); if (!pool) { newflags |= BH_MYPOOL; @@ -101,75 +104,88 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo bh->flags = newflags; bh->pool = pool; bh->tpool = tpool; - ks_dht_dehex(nid.id, nodeid, KS_DHT_NODEID_SIZE); - blade_peer_create(&bh->peer, bh->pool, bh->tpool, &nid); + //blade_peer_create(&bh->peer, bh->pool, bh->tpool); *bhP = bh; return KS_STATUS_SUCCESS; } -KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer) +ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config) { - ks_dht_nodeid_t *nodeid = NULL; - - ks_assert(bh); - ks_assert(bh->peer); - - nodeid = blade_peer_myid(bh->peer); - ks_dht_hex(nodeid->id, buffer, KS_DHT_NODEID_SIZE); -} - -KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port) -{ - ks_assert(bh); - ks_assert(bh->peer); - - blade_peer_autoroute(bh->peer, autoroute, port); -} - -KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint) -{ - ks_sockaddr_t addr; - int family = AF_INET; + config_setting_t *datastore = NULL; + config_setting_t *directory = NULL; ks_assert(bh); - ks_assert(ip); - ks_assert(port); - if (ip[1] != '.' && ip[2] != '.' && ip[3] != '.') family = AF_INET6; + if (!config) return KS_STATUS_FAIL; + if (!config_setting_is_group(config)) return KS_STATUS_FAIL; + + datastore = config_setting_get_member(config, "datastore"); + //if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL; - ks_addr_set(&addr, ip, port, family); - return blade_peer_bind(bh->peer, &addr, endpoint); + directory = config_setting_get_member(config, "directory"); + //if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL; + + bh->config_datastore = datastore; + bh->config_directory = directory; + + return KS_STATUS_SUCCESS; } -KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout) -{ - ks_assert(bh); - ks_assert(timeout >= 0); - - blade_peer_pulse(bh->peer, timeout); - if (bh->datastore) blade_datastore_pulse(bh->datastore, timeout); -} - - -KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh) +KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config) { ks_assert(bh); - if (bh->datastore) return; + if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + if (bh->config_datastore && !blade_handle_datastore_available(bh)) { + blade_datastore_create(&bh->datastore, bh->pool, bh->tpool); + blade_datastore_startup(bh->datastore, bh->config_datastore); + } + + if (bh->config_directory && !blade_handle_directory_available(bh)) { + blade_directory_create(&bh->directory, bh->pool, bh->tpool); + blade_directory_startup(bh->directory, config); + } + + return KS_STATUS_SUCCESS; +} - blade_datastore_create(&bh->datastore, bh->pool); +KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) +{ + ks_assert(bh); + + if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory); + + if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh) +{ + ks_assert(bh); + + return bh->datastore != NULL; +} + +KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh) +{ + ks_assert(bh); + + return bh->directory != NULL; } KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length) { ks_assert(bh); - ks_assert(bh->datastore); ks_assert(key); ks_assert(key_length > 0); ks_assert(data); ks_assert(data_length > 0); + + if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE; return blade_datastore_store(bh->datastore, key, key_length, data, data_length); } @@ -181,11 +197,12 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, void *userdata) { ks_assert(bh); - ks_assert(bh->datastore); ks_assert(callback); ks_assert(key); ks_assert(key_length > 0); + if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE; + return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata); } diff --git a/libs/libblade/src/include/blade.h b/libs/libblade/src/include/blade.h index 4d44ea07fb..578d41fee0 100644 --- a/libs/libblade/src/include/blade.h +++ b/libs/libblade/src/include/blade.h @@ -36,11 +36,14 @@ #include #include #include +#include #include "unqlite.h" #include "blade_types.h" #include "blade_stack.h" #include "blade_peer.h" +#include "blade_service.h" #include "blade_datastore.h" +#include "blade_directory.h" #include "bpcp.h" KS_BEGIN_EXTERN_C diff --git a/libs/libblade/src/include/blade_datastore.h b/libs/libblade/src/include/blade_datastore.h index 62918a3a41..641eccbd55 100644 --- a/libs/libblade/src/include/blade_datastore.h +++ b/libs/libblade/src/include/blade_datastore.h @@ -35,10 +35,17 @@ #define _BLADE_DATASTORE_H_ #include +#define BLADE_DATASTORE_TPOOL_MIN 2 +#define BLADE_DATASTORE_TPOOL_MAX 8 +#define BLADE_DATASTORE_TPOOL_STACK (1024 * 256) +#define BLADE_DATASTORE_TPOOL_IDLE 10 + KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool); +KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool); KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP); -KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout); +KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config); +KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds); + KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length); KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length); KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds, diff --git a/libs/libblade/src/include/blade_directory.h b/libs/libblade/src/include/blade_directory.h new file mode 100644 index 0000000000..e0a66980f3 --- /dev/null +++ b/libs/libblade/src/include/blade_directory.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2007-2014, Anthony Minessale II + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _BLADE_DIRECTORY_H_ +#define _BLADE_DIRECTORY_H_ +#include + +#define BLADE_DIRECTORY_TPOOL_MIN 2 +#define BLADE_DIRECTORY_TPOOL_MAX 8 +#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256) +#define BLADE_DIRECTORY_TPOOL_IDLE 10 + +KS_BEGIN_EXTERN_C +KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool); +KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP); +KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config); +KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd); +KS_END_EXTERN_C + +#endif + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/include/blade_peer.h b/libs/libblade/src/include/blade_peer.h index 3c0bb5925d..8a15044ac5 100644 --- a/libs/libblade/src/include/blade_peer.h +++ b/libs/libblade/src/include/blade_peer.h @@ -41,12 +41,10 @@ #define BLADE_PEER_TPOOL_IDLE 10 KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid); +KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool); KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP); -KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp); -KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port); -KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint); -KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout); +KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws); +KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_service.h b/libs/libblade/src/include/blade_service.h new file mode 100644 index 0000000000..9ec014f277 --- /dev/null +++ b/libs/libblade/src/include/blade_service.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2007-2014, Anthony Minessale II + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _BLADE_SERVICE_H_ +#define _BLADE_SERVICE_H_ +#include + +#define BLADE_SERVICE_TPOOL_MIN 2 +#define BLADE_SERVICE_TPOOL_MAX 8 +#define BLADE_SERVICE_TPOOL_STACK (1024 * 256) +#define BLADE_SERVICE_TPOOL_IDLE 10 + +KS_BEGIN_EXTERN_C +KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool); +KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP); +KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config); +KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs); +KS_END_EXTERN_C + +#endif + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 81411fabdc..1eb4229d72 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -42,12 +42,13 @@ KS_BEGIN_EXTERN_C KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP); -KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid); -KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer); -KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port); -KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint); -KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout); -KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh); +KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool); +KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config); +KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh); + +KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh); +KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh); + KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length); KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, blade_datastore_fetch_callback_t callback, diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index 0bb43a4c07..0cb2ec8e3e 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -39,7 +39,9 @@ KS_BEGIN_EXTERN_C typedef struct blade_handle_s blade_handle_t; typedef struct blade_peer_s blade_peer_t; +typedef struct blade_service_s blade_service_t; typedef struct blade_datastore_s blade_datastore_t; +typedef struct blade_directory_s blade_directory_t; typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata); diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c index 478b614bc0..09e821edc2 100644 --- a/libs/libblade/test/bladec.c +++ b/libs/libblade/test/bladec.c @@ -28,16 +28,12 @@ struct command_def_s { void command_test(blade_handle_t *bh, char *args); void command_quit(blade_handle_t *bh, char *args); -void command_myid(blade_handle_t *bh, char *args); -void command_bind(blade_handle_t *bh, char *args); void command_store(blade_handle_t *bh, char *args); void command_fetch(blade_handle_t *bh, char *args); static const struct command_def_s command_defs[] = { { "test", command_test }, { "quit", command_quit }, - { "myid", command_myid }, - { "bind", command_bind }, { "store", command_store }, { "fetch", command_fetch }, @@ -48,19 +44,12 @@ static const struct command_def_s command_defs[] = { int main(int argc, char **argv) { blade_handle_t *bh = NULL; - const char *nodeid; - - ks_assert(argc >= 2); - - nodeid = argv[1]; ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG); blade_init(); - blade_handle_create(&bh, NULL, NULL, nodeid); - - blade_handle_autoroute(bh, KS_TRUE, KS_DHT_DEFAULT_PORT); + blade_handle_create(&bh, NULL, NULL); loop(bh); @@ -121,7 +110,7 @@ void loop(blade_handle_t *bh) // @todo lines must not exceed 512 bytes, treat as error and ignore buffer until next new line? ks_assert(0); } - blade_handle_pulse(bh, 1); + blade_handle_pulse(bh); } } @@ -179,34 +168,6 @@ void command_quit(blade_handle_t *bh, char *args) g_shutdown = KS_TRUE; } -void command_myid(blade_handle_t *bh, char *args) -{ - char buf[KS_DHT_NODEID_SIZE * 2 + 1]; - - ks_assert(bh); - ks_assert(args); - - blade_handle_myid(bh, buf); - - ks_log(KS_LOG_INFO, "%s\n", buf); -} - -void command_bind(blade_handle_t *bh, char *args) -{ - char *ip = NULL; - char *port = NULL; - ks_port_t p; - - ks_assert(args); - - parse_argument(&args, &ip, ' '); - parse_argument(&args, &port, ' '); - - p = atoi(port); // @todo use strtol for error handling - - blade_handle_bind(bh, ip, p, NULL); -} - void command_store(blade_handle_t *bh, char *args) { char *key; @@ -214,7 +175,7 @@ void command_store(blade_handle_t *bh, char *args) ks_assert(args); - blade_handle_datastore_start(bh); + blade_handle_datastore_startup(bh, NULL); parse_argument(&args, &key, ' '); parse_argument(&args, &data, ' '); @@ -234,7 +195,7 @@ void command_fetch(blade_handle_t *bh, char *args) ks_assert(args); - blade_handle_datastore_start(bh); + blade_handle_datastore_startup(bh, NULL); parse_argument(&args, &key, ' ');