diff --git a/src/mod/applications/mod_mongo/Makefile b/src/mod/applications/mod_mongo/Makefile new file mode 100644 index 0000000000..26687b5ac6 --- /dev/null +++ b/src/mod/applications/mod_mongo/Makefile @@ -0,0 +1,29 @@ +BASE=../../../.. + +MONGO_CXX_DRIVER_VERSION=v1.8 +MONGO_CXX_DRIVER_URL=http://downloads.mongodb.org/cxx-driver +MONGO_CXX_DRIVER_TARBALL=mongodb-linux-x86_64-$(MONGO_CXX_DRIVER_VERSION)-latest.tgz +MONGO_CXX_DRIVER_SRC=$(BASE)/libs/mongo-cxx-driver-$(MONGO_CXX_DRIVER_VERSION) +LIBMONGOCLIENT_A =$(MONGO_CXX_DRIVER_SRC)/libmongoclient.a + +LOCAL_SOURCES=mongo_conn.cpp +LOCAL_OBJS=mongo_conn.o + +LOCAL_CFLAGS=-I$(MONGO_CXX_DRIVER_SRC)/mongo +LOCAL_LIBADD=$(LIBMONGOCLIENT_A) +LOCAL_LDFLAGS=-lboost_thread -lboost_filesystem-mt -lboost_system-mt +MODDIR=$(shell pwd) + + +include $(BASE)/build/modmake.rules + +$(MONGO_CXX_DRIVER_SRC): + $(GETLIB) $(MONGO_CXX_DRIVER_URL) $(MONGO_CXX_DRIVER_TARBALL) + cd $(MONGO_CXX_DRIVER_SRC) && patch -p0 -i $(MODDIR)/fpic_hack.diff + $(TOUCH_TARGET) + + +$(LIBMONGOCLIENT_A): $(MONGO_CXX_DRIVER_SRC) + cd $(MONGO_CXX_DRIVER_SRC) && scons + $(TOUCH_TARGET) + diff --git a/src/mod/applications/mod_mongo/fpic_hack.diff b/src/mod/applications/mod_mongo/fpic_hack.diff new file mode 100644 index 0000000000..882f3c899a --- /dev/null +++ b/src/mod/applications/mod_mongo/fpic_hack.diff @@ -0,0 +1,11 @@ +--- SConstruct.orig 2011-04-28 19:00:36.000000000 +0200 ++++ SConstruct 2011-04-28 19:01:19.000000000 +0200 +@@ -45,7 +45,7 @@ + linux = True + + if nix: +- env.Append( CPPFLAGS=" -O3" ) ++ env.Append( CPPFLAGS=" -I../pcre -fPIC -O3" ) + env.Append( LIBS=["pthread"] ) + if linux: + env.Append( LINKFLAGS=" -Wl,--as-needed -Wl,-zdefs " ) diff --git a/src/mod/applications/mod_mongo/mod_mongo.cpp b/src/mod/applications/mod_mongo/mod_mongo.cpp new file mode 100644 index 0000000000..d76e885765 --- /dev/null +++ b/src/mod/applications/mod_mongo/mod_mongo.cpp @@ -0,0 +1,142 @@ +#include +#include "mod_mongo.h" + + +static struct { + mongo_connection_pool_t *conn_pool; +} globals; + + +static const char *SYNTAX = "mongo_find_one ns; query; fields"; + +SWITCH_STANDARD_API(mongo_find_one_function) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + char *ns = NULL, *json_query = NULL, *json_fields = NULL; + char delim = ';'; + + ns = strdup(cmd); + switch_assert(ns != NULL); + + if ((json_query = strchr(ns, delim))) { + *json_query++ = '\0'; + if ((json_fields = strchr(json_query, delim))) { + *json_fields++ = '\0'; + } + } + + if (!zstr(ns) && !zstr(json_query) && !zstr(json_fields)) { + + try { + BSONObj query = fromjson(json_query); + BSONObj fields = fromjson(json_fields); + + DBClientConnection *conn = mongo_connection_pool_get(globals.conn_pool); + BSONObj res = conn->findOne(ns, Query(query), &fields); + mongo_connection_pool_put(globals.conn_pool, conn); + + stream->write_function(stream, "-OK\n%s\n", res.toString().c_str()); + } catch (MsgAssertionException &e) { + stream->write_function(stream, "-ERR\n%s\n", e.what()); + } + + + } else { + stream->write_function(stream, "-ERR\n%s\n", SYNTAX); + } + + switch_safe_free(ns); + + return status; +} + +static switch_status_t config(void) +{ + const char *cf = "mongo.conf"; + switch_xml_t cfg, xml, settings, param; + switch_status_t status = SWITCH_STATUS_SUCCESS; + const char *host = "127.0.0.1"; + switch_size_t min_connections = 1, max_connections = 1; + + if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf); + return SWITCH_STATUS_GENERR; + } + + if ((settings = switch_xml_child(cfg, "settings"))) { + for (param = switch_xml_child(settings, "param"); param; param = param->next) { + char *var = (char *) switch_xml_attr_soft(param, "name"); + char *val = (char *) switch_xml_attr_soft(param, "value"); + int tmp; + + if (!strcmp(var, "host")) { + host = val; + } else if (!strcmp(var, "min-connections")) { + if ((tmp = atoi(val)) > 0) { + min_connections = tmp; + } + } else if (!strcmp(var, "max-connections")) { + if ((tmp = atoi(val)) > 0) { + max_connections = tmp; + } + } + } + } + + if (mongo_connection_pool_create(&globals.conn_pool, min_connections, max_connections, host) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Can't create connection pool\n"); + status = SWITCH_STATUS_GENERR; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Mongo connection pool created [%s %d/%d]\n", host, (int)min_connections, (int)max_connections); + } + + switch_xml_free(xml); + + return status; +} + + +SWITCH_BEGIN_EXTERN_C + +SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load); +SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown); +SWITCH_MODULE_DEFINITION(mod_mongo, mod_mongo_load, mod_mongo_shutdown, NULL); + + +SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load) +{ + switch_api_interface_t *api_interface; + switch_application_interface_t *app_interface; + + *module_interface = switch_loadable_module_create_module_interface(pool, modname); + + memset(&globals, 0, sizeof(globals)); + + if (config() != SWITCH_STATUS_SUCCESS) { + return SWITCH_STATUS_TERM; + } + + SWITCH_ADD_API(api_interface, "mongo_find_one", "mongo", mongo_find_one_function, SYNTAX); + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown) +{ + mongo_connection_pool_destroy(&globals.conn_pool); + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_END_EXTERN_C + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 + */ + diff --git a/src/mod/applications/mod_mongo/mod_mongo.h b/src/mod/applications/mod_mongo/mod_mongo.h new file mode 100644 index 0000000000..5d2ed54dcb --- /dev/null +++ b/src/mod/applications/mod_mongo/mod_mongo.h @@ -0,0 +1,35 @@ +#ifndef MOD_MONGO_H +#define MOD_MONGO_H + + +#include +#include +#include +#include + +using namespace mongo; + +typedef struct { + char *host; + + switch_size_t min_connections; + switch_size_t max_connections; + switch_size_t size; + switch_queue_t *connections; + switch_mutex_t *mutex; + switch_memory_pool_t *pool; + +} mongo_connection_pool_t; + + +switch_status_t mongo_connection_pool_create(mongo_connection_pool_t **conn_pool, switch_size_t min_connections, switch_size_t max_connections, + const char *host); +void mongo_connection_pool_destroy(mongo_connection_pool_t **conn_pool); + + +DBClientConnection *mongo_connection_pool_get(mongo_connection_pool_t *conn_pool); +switch_status_t mongo_connection_pool_put(mongo_connection_pool_t *conn_pool, DBClientConnection *conn); + + +#endif + diff --git a/src/mod/applications/mod_mongo/mongo_conn.cpp b/src/mod/applications/mod_mongo/mongo_conn.cpp new file mode 100644 index 0000000000..649ac276b8 --- /dev/null +++ b/src/mod/applications/mod_mongo/mongo_conn.cpp @@ -0,0 +1,161 @@ +#include +#include "mod_mongo.h" + + /* + we could use the driver's connection pool, + if we could set the max connections (PoolForHost::setMaxPerHost) + + ScopedDbConnection scoped_conn("host"); + DBClientConnection *conn = dynamic_cast< DBClientConnection* >(&scoped_conn.conn()); + scoped_conn.done(); + */ + +switch_status_t mongo_connection_create(DBClientConnection **connection, const char *host) +{ + DBClientConnection *conn = new DBClientConnection(); + + try { + conn->connect(host); + } catch (DBException &e) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't connect to mongo [%s]\n", host); + return SWITCH_STATUS_GENERR; + } + + *connection = conn; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Connected to mongo [%s]\n", host); + + return SWITCH_STATUS_SUCCESS; +} + +void mongo_connection_destroy(DBClientConnection **conn) +{ + switch_assert(*conn != NULL); + delete *conn; + + *conn = NULL; +} + +switch_status_t mongo_connection_pool_create(mongo_connection_pool_t **conn_pool, switch_size_t min_connections, switch_size_t max_connections, + const char *host) +{ + switch_memory_pool_t *pool = NULL; + switch_status_t status = SWITCH_STATUS_SUCCESS; + mongo_connection_pool_t *cpool = NULL; + DBClientConnection *conn = NULL; + + if ((status = switch_core_new_memory_pool(&pool)) != SWITCH_STATUS_SUCCESS) { + return status; + } + + if (!(cpool = (mongo_connection_pool_t *)switch_core_alloc(pool, sizeof(mongo_connection_pool_t)))) { + switch_goto_status(SWITCH_STATUS_MEMERR, done); + } + + if ((status = switch_mutex_init(&cpool->mutex, SWITCH_MUTEX_NESTED, pool)) != SWITCH_STATUS_SUCCESS) { + goto done; + } + + if ((status = switch_queue_create(&cpool->connections, max_connections, pool)) != SWITCH_STATUS_SUCCESS) { + goto done; + } + + cpool->min_connections = min_connections; + cpool->max_connections = max_connections; + cpool->host = switch_core_strdup(pool, host); + + cpool->pool = pool; + + for (cpool->size = 0; cpool->size < min_connections; cpool->size++) { + + if ((status = mongo_connection_create(&conn, host)) == SWITCH_STATUS_SUCCESS) { + mongo_connection_pool_put(cpool, conn); + } else { + break; + } + } + + done: + + if (status == SWITCH_STATUS_SUCCESS) { + *conn_pool = cpool; + } else { + switch_core_destroy_memory_pool(&pool); + } + + + return status; +} + +void mongo_connection_pool_destroy(mongo_connection_pool_t **conn_pool) +{ + mongo_connection_pool_t *cpool = *conn_pool; + void *data = NULL; + + switch_assert(cpool != NULL); + + while (switch_queue_trypop(cpool->connections, &data) == SWITCH_STATUS_SUCCESS) { + mongo_connection_destroy((DBClientConnection **)&data); + } + + switch_mutex_destroy(cpool->mutex); + switch_core_destroy_memory_pool(&cpool->pool); + + *conn_pool = NULL; +} + + +DBClientConnection *mongo_connection_pool_get(mongo_connection_pool_t *conn_pool) +{ + DBClientConnection *conn = NULL; + void *data = NULL; + + switch_assert(conn_pool != NULL); + + switch_mutex_lock(conn_pool->mutex); + + if (switch_queue_trypop(conn_pool->connections, &data) == SWITCH_STATUS_SUCCESS) { + conn = (DBClientConnection *) data; + } else if (mongo_connection_create(&conn, conn_pool->host) == SWITCH_STATUS_SUCCESS) { + if (++conn_pool->size > conn_pool->max_connections) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Connection pool is empty. You may want to increase 'max-connections'\n"); + } + } + + switch_mutex_unlock(conn_pool->mutex); + +#ifdef MONGO_POOL_DEBUG + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL get: size %d conn: %p\n", (int) switch_queue_size(conn_pool->connections), conn); +#endif + + return conn; +} + +switch_status_t mongo_connection_pool_put(mongo_connection_pool_t *conn_pool, DBClientConnection *conn) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + + switch_assert(conn_pool != NULL); + switch_assert(conn != NULL); + + switch_mutex_lock(conn_pool->mutex); + if (conn_pool->size > conn_pool->max_connections) { +#ifdef MONGO_POOL_DEBUG + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: Destroy connection %p\n", conn); +#endif + mongo_connection_destroy(&conn); + conn_pool->size--; + } else { +#ifdef MONGO_POOL_DEBUG + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: push connection %p\n", conn); +#endif + status = switch_queue_push(conn_pool->connections, conn); + } + + switch_mutex_unlock(conn_pool->mutex); + +#ifdef MONGO_POOL_DEBUG + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL put: size %d conn: %p\n", (int) switch_queue_size(conn_pool->connections), conn); +#endif + + return status; +}