diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 73607926fa..d6da431dfc 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -2193,7 +2193,11 @@ SWITCH_DECLARE(switch_status_t) switch_core_chat_send(const char *dest_proto, sw SWITCH_DECLARE(switch_status_t) switch_core_chat_deliver(const char *dest_proto, switch_event_t **message_event); SWITCH_DECLARE(switch_status_t) switch_ivr_preprocess_session(switch_core_session_t *session, const char *cmds); -SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void); +SWITCH_DECLARE(void) switch_sqldb_stop_thread(switch_sql_manager_t *sql_manager); +#define switch_core_sqldb_stop_thread() switch_sqldb_stop_thread(&switch_cache_db_sql_manager) + +SWITCH_DECLARE(void) switch_sqldb_start_thread(switch_sql_manager_t *sql_manager, void *(SWITCH_THREAD_FUNC * func) (switch_thread_t *, void *), + char **post_connect_sql, int argc); SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void); ///\} @@ -2240,8 +2244,6 @@ typedef union { switch_cache_db_pgsql_options_t pgsql_options; } switch_cache_db_connection_options_t; -struct switch_cache_db_handle; -typedef struct switch_cache_db_handle switch_cache_db_handle_t; static inline const char *switch_cache_db_type_name(switch_cache_db_handle_type_t type) { @@ -2281,7 +2283,9 @@ SWITCH_DECLARE(void) switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t other threads until the allocating thread actually terminates. \param [in] The handle */ +SWITCH_DECLARE(void) _switch_cache_db_release_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh); SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t ** dbh); + /*! \brief Gets a new cached handle from the pool, potentially creating a new connection. The connection is bound to the thread until it (the thread) terminates unless @@ -2290,12 +2294,19 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t \param [in] type - ODBC or SQLLITE \param [in] connection_options (userid, password, etc) */ +SWITCH_DECLARE(switch_status_t) __switch_cache_db_get_db_handle(switch_sql_manager_t *sql_manager, + switch_cache_db_handle_t ** dbh, + switch_cache_db_handle_type_t type, + switch_cache_db_connection_options_t *connection_options, + const char *file, const char *func, int line); SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t ** dbh, switch_cache_db_handle_type_t type, switch_cache_db_connection_options_t *connection_options, const char *file, const char *func, int line); #define switch_cache_db_get_db_handle(_a, _b, _c) _switch_cache_db_get_db_handle(_a, _b, _c, __FILE__, __SWITCH_FUNC__, __LINE__) + + /*! \brief Executes the sql and returns the result as a string \param [in] dbh The handle @@ -2330,15 +2341,21 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach */ SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh); +extern switch_sql_manager_t switch_cache_db_sql_manager; /*! \brief Provides some feedback as to the status of the db connection pool \param [in] stream stream for status */ +SWITCH_DECLARE(void) _switch_cache_db_status(switch_sql_manager_t *sql_manager, switch_stream_handle_t *stream); SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream); +SWITCH_DECLARE(switch_status_t) __switch_core_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line); SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line); #define switch_core_db_handle(_a) _switch_core_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__) -SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line); -#define switch_core_recovery_db_handle(_a) _switch_core_recovery_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__) +SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line); +#define switch_core_recovery_db_handle(_a) _switch_core_recovery_db_handle(&switch_cache_db_sql_manager, _a, __FILE__, __SWITCH_FUNC__, __LINE__) + +SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line); +#define switch_core_persist_db_handle(_a) _switch_core_persist_db_handle(&switch_cache_db_sql_manager, _a, __FILE__, __SWITCH_FUNC__, __LINE__) SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *db, const char *test_sql, const char *drop_sql, const char *reactive_sql); @@ -2347,7 +2364,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_ SWITCH_DECLARE(void) switch_core_set_signal_handlers(void); SWITCH_DECLARE(uint32_t) switch_core_debug_level(void); -SWITCH_DECLARE(void) switch_cache_db_flush_handles(void); +SWITCH_DECLARE(void) switch_cache_db_flush_handles(switch_sql_manager_t *sql_manager); +#define switch_core_cache_db_flush_handles() switch_cache_db_flush_handles(&switch_cache_db_sql_manager) + SWITCH_DECLARE(const char *) switch_core_banner(void); SWITCH_DECLARE(switch_bool_t) switch_core_session_in_thread(switch_core_session_t *session); SWITCH_DECLARE(uint32_t) switch_default_ptime(const char *name, uint32_t number); diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 6f77f32ef4..ddd66a0a4e 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -1836,6 +1836,11 @@ typedef struct switch_odbc_handle switch_odbc_handle_t; typedef struct switch_pgsql_handle switch_pgsql_handle_t; typedef struct switch_pgsql_result switch_pgsql_result_t; +struct switch_cache_db_handle; +typedef struct switch_cache_db_handle switch_cache_db_handle_t; +struct switch_sql_manager; +typedef struct switch_sql_manager switch_sql_manager_t; + typedef struct switch_io_routines switch_io_routines_t; typedef struct switch_speech_handle switch_speech_handle_t; typedef struct switch_asr_handle switch_asr_handle_t; diff --git a/src/switch_core.c b/src/switch_core.c index 8e1a6567d7..9c525a6c1e 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -2260,7 +2260,7 @@ SWITCH_DECLARE(int32_t) switch_core_session_ctl(switch_session_ctl_t cmd, void * switch_time_calibrate_clock(); break; case SCSC_FLUSH_DB_HANDLES: - switch_cache_db_flush_handles(); + switch_core_cache_db_flush_handles(); break; case SCSC_SEND_SIGHUP: handle_SIGHUP(1); diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index cbc7a611e3..23b08ace75 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -26,6 +26,7 @@ * Anthony Minessale II * Michael Jerris * Paul D. Tinsley + * Eliot Gable * * * switch_core_sqldb.c -- Main Core Library (statistics tracker) @@ -55,7 +56,7 @@ struct switch_cache_db_handle { struct switch_cache_db_handle *next; }; -static struct { +struct switch_sql_manager { switch_cache_db_handle_t *event_db; switch_queue_t *sql_queue[2]; switch_memory_pool_t *memory_pool; @@ -73,7 +74,9 @@ static struct { uint32_t total_handles; uint32_t total_used_handles; switch_cache_db_handle_t *dbh; -} sql_manager; +}; + +switch_sql_manager_t switch_cache_db_sql_manager; static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type) @@ -90,11 +93,11 @@ static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t typ return new_dbh; } -static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const char *db_callsite_str, const char *thread_str) +static void add_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t *dbh, const char *db_str, const char *db_callsite_str, const char *thread_str) { switch_ssize_t hlen = -1; - switch_mutex_lock(sql_manager.dbh_mutex); + switch_mutex_lock(sql_manager->dbh_mutex); switch_set_string(dbh->creator, db_callsite_str); @@ -103,37 +106,37 @@ static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const dbh->thread_hash = switch_ci_hashfunc_default(thread_str, &hlen); dbh->use_count++; - sql_manager.total_used_handles++; - dbh->next = sql_manager.handle_pool; + sql_manager->total_used_handles++; + dbh->next = sql_manager->handle_pool; - sql_manager.handle_pool = dbh; - sql_manager.total_handles++; + sql_manager->handle_pool = dbh; + sql_manager->total_handles++; switch_mutex_lock(dbh->mutex); - switch_mutex_unlock(sql_manager.dbh_mutex); + switch_mutex_unlock(sql_manager->dbh_mutex); } -static void del_handle(switch_cache_db_handle_t *dbh) +static void del_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t *dbh) { switch_cache_db_handle_t *dbh_ptr, *last = NULL; - switch_mutex_lock(sql_manager.dbh_mutex); - for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) { + switch_mutex_lock(sql_manager->dbh_mutex); + for (dbh_ptr = sql_manager->handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) { if (dbh_ptr == dbh) { if (last) { last->next = dbh_ptr->next; } else { - sql_manager.handle_pool = dbh_ptr->next; + sql_manager->handle_pool = dbh_ptr->next; } - sql_manager.total_handles--; + sql_manager->total_handles--; break; } last = dbh_ptr; } - switch_mutex_unlock(sql_manager.dbh_mutex); + switch_mutex_unlock(sql_manager->dbh_mutex); } -static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user_str, const char *thread_str) +static switch_cache_db_handle_t *get_handle(switch_sql_manager_t *sql_manager, const char *db_str, const char *user_str, const char *thread_str) { switch_ssize_t hlen = -1; unsigned long hash = 0, thread_hash = 0; @@ -142,9 +145,9 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user hash = switch_ci_hashfunc_default(db_str, &hlen); thread_hash = switch_ci_hashfunc_default(thread_str, &hlen); - switch_mutex_lock(sql_manager.dbh_mutex); + switch_mutex_lock(sql_manager->dbh_mutex); - for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) { + for (dbh_ptr = sql_manager->handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) { if (dbh_ptr->thread_hash == thread_hash && dbh_ptr->hash == hash && !switch_test_flag(dbh_ptr, CDF_PRUNE) && switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) { r = dbh_ptr; @@ -152,7 +155,7 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user } if (!r) { - for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) { + for (dbh_ptr = sql_manager->handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) { if (dbh_ptr->hash == hash && !dbh_ptr->use_count && !switch_test_flag(dbh_ptr, CDF_PRUNE) && switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) { r = dbh_ptr; @@ -163,13 +166,13 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user if (r) { r->use_count++; - sql_manager.total_used_handles++; + sql_manager->total_used_handles++; r->hash = switch_ci_hashfunc_default(db_str, &hlen); r->thread_hash = thread_hash; switch_set_string(r->last_user, user_str); } - switch_mutex_unlock(sql_manager.dbh_mutex); + switch_mutex_unlock(sql_manager->dbh_mutex); return r; @@ -181,11 +184,16 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user \brief Open the default system database */ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) +{ + return __switch_core_db_handle(&switch_cache_db_sql_manager, dbh, file, func, line); +} + +SWITCH_DECLARE(switch_status_t) __switch_core_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) { switch_cache_db_connection_options_t options = { {0} }; switch_status_t r; - if (!sql_manager.manage) { + if (!sql_manager->manage) { return SWITCH_STATUS_FALSE; } @@ -199,26 +207,26 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t } else { options.core_db_options.db_path = SWITCH_CORE_DB; } - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); } else { char *dsn; if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) { options.pgsql_options.dsn = (char*)(dsn + 6); - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_PGSQL, &options, file, func, line); } else { options.odbc_options.dsn = runtime.odbc_dsn; options.odbc_options.user = runtime.odbc_user; options.odbc_options.pass = runtime.odbc_pass; - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_ODBC, &options, file, func, line); } } /* I *think* we can do without this now, if not let me know if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) { - (*dbh)->io_mutex = sql_manager.io_mutex; + (*dbh)->io_mutex = sql_manager->io_mutex; } */ @@ -229,12 +237,12 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t /*! \brief Open the default system database */ -SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) +SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) { switch_cache_db_connection_options_t options = { {0} }; switch_status_t r; - if (!sql_manager.manage) { + if (!sql_manager->manage) { return SWITCH_STATUS_FALSE; } @@ -248,26 +256,26 @@ SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_h } else { options.core_db_options.db_path = SWITCH_CORE_PERSIST_DB; } - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); } else { char *dsn; if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) { options.pgsql_options.dsn = (char*)(dsn + 6); - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_PGSQL, &options, file, func, line); } else { options.odbc_options.dsn = runtime.odbc_dsn; options.odbc_options.user = runtime.odbc_user; options.odbc_options.pass = runtime.odbc_pass; - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_ODBC, &options, file, func, line); } } /* I *think* we can do without this now, if not let me know if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) { - (*dbh)->io_mutex = sql_manager.io_mutex; + (*dbh)->io_mutex = sql_manager->io_mutex; } */ @@ -276,12 +284,12 @@ SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_h #define SWITCH_CORE_RECOVERY_DB "core_recovery" -SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) +SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) { switch_cache_db_connection_options_t options = { {0} }; switch_status_t r; - if (!sql_manager.manage) { + if (!sql_manager->manage) { return SWITCH_STATUS_FALSE; } @@ -295,26 +303,26 @@ SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_ } else { options.core_db_options.db_path = SWITCH_CORE_RECOVERY_DB; } - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); } else { char *dsn; if ((dsn = strstr(runtime.recovery_odbc_dsn, "pgsql;")) != NULL) { options.pgsql_options.dsn = (char*)(dsn + 6); - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_PGSQL, &options, file, func, line); } else { options.odbc_options.dsn = runtime.recovery_odbc_dsn; options.odbc_options.user = runtime.recovery_odbc_user; options.odbc_options.pass = runtime.recovery_odbc_pass; - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line); + r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_ODBC, &options, file, func, line); } } /* I *think* we can do without this now, if not let me know if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) { - (*dbh)->io_mutex = sql_manager.io_mutex; + (*dbh)->io_mutex = sql_manager->io_mutex; } */ @@ -326,16 +334,16 @@ SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_ #define SQL_REG_TIMEOUT 15 -static void sql_close(time_t prune) +static void sql_close(switch_sql_manager_t *sql_manager, time_t prune) { switch_cache_db_handle_t *dbh = NULL; int locked = 0; - switch_mutex_lock(sql_manager.dbh_mutex); + switch_mutex_lock(sql_manager->dbh_mutex); top: locked = 0; - for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) { + for (dbh = sql_manager->handle_pool; dbh; dbh = dbh->next) { time_t diff = 0; if (prune > 0 && prune > dbh->last_used) { @@ -368,7 +376,7 @@ static void sql_close(time_t prune) break; } - del_handle(dbh); + del_handle(sql_manager, dbh); switch_mutex_unlock(dbh->mutex); switch_core_destroy_memory_pool(&dbh->pool); goto top; @@ -386,7 +394,7 @@ static void sql_close(time_t prune) goto top; } - switch_mutex_unlock(sql_manager.dbh_mutex); + switch_mutex_unlock(sql_manager->dbh_mutex); } @@ -395,16 +403,20 @@ SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_cache_db_get_type(switch_ca return dbh->type; } -SWITCH_DECLARE(void) switch_cache_db_flush_handles(void) +SWITCH_DECLARE(void) switch_cache_db_flush_handles(switch_sql_manager_t *sql_manager) { - sql_close(switch_epoch_time_now(NULL) + SQL_CACHE_TIMEOUT + 1); + sql_close(sql_manager, switch_epoch_time_now(NULL) + SQL_CACHE_TIMEOUT + 1); } - SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t **dbh) +{ + _switch_cache_db_release_db_handle(&switch_cache_db_sql_manager, dbh); +} + +SWITCH_DECLARE(void) _switch_cache_db_release_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh) { if (dbh && *dbh) { - switch_mutex_lock(sql_manager.dbh_mutex); + switch_mutex_lock(sql_manager->dbh_mutex); (*dbh)->last_used = switch_epoch_time_now(NULL); (*dbh)->io_mutex = NULL; @@ -415,9 +427,9 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t } } switch_mutex_unlock((*dbh)->mutex); - sql_manager.total_used_handles--; + sql_manager->total_used_handles--; *dbh = NULL; - switch_mutex_unlock(sql_manager.dbh_mutex); + switch_mutex_unlock(sql_manager->dbh_mutex); } } @@ -428,7 +440,16 @@ SWITCH_DECLARE(void) switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t } -SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh, +SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t ** dbh, + switch_cache_db_handle_type_t type, + switch_cache_db_connection_options_t *connection_options, + const char *file, const char *func, int line) +{ + return __switch_cache_db_get_db_handle(&switch_cache_db_sql_manager, dbh, type, connection_options, file, func, line); +} + +SWITCH_DECLARE(switch_status_t) __switch_cache_db_get_db_handle(switch_sql_manager_t *sql_manager, + switch_cache_db_handle_t **dbh, switch_cache_db_handle_type_t type, switch_cache_db_connection_options_t *connection_options, const char *file, const char *func, int line) @@ -445,7 +466,7 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h const char *odbc_user = NULL; const char *odbc_pass = NULL; - while(runtime.max_db_handles && sql_manager.total_handles >= runtime.max_db_handles && sql_manager.total_used_handles >= sql_manager.total_handles) { + while(runtime.max_db_handles && sql_manager->total_handles >= runtime.max_db_handles && sql_manager->total_used_handles >= sql_manager->total_handles) { if (!waiting++) { switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_WARNING, "Max handles %u exceeded, blocking....\n", runtime.max_db_handles); @@ -496,7 +517,7 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line); snprintf(thread_str, sizeof(thread_str) - 1, "thread=\"%lu\"", (unsigned long) (intptr_t) self); - if ((new_dbh = get_handle(db_str, db_callsite_str, thread_str))) { + if ((new_dbh = get_handle(sql_manager, db_str, db_callsite_str, thread_str))) { switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, "Reuse Unused Cached DB handle %s [%s]\n", new_dbh->name, switch_cache_db_type_name(new_dbh->type)); } else { @@ -552,9 +573,6 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h goto end; } - switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, - "Create Cached DB handle %s [%s] %s:%d\n", new_dbh->name, switch_cache_db_type_name(type), file, line); - new_dbh = create_handle(type); if (db) { @@ -565,7 +583,11 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h new_dbh->native_handle.pgsql_dbh = pgsql_dbh; } - add_handle(new_dbh, db_str, db_callsite_str, thread_str); + add_handle(sql_manager, new_dbh, db_str, db_callsite_str, thread_str); + + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, + "Create Cached DB handle %s [%s] %s:%d\n", new_dbh->name, switch_cache_db_type_name(type), file, line); + } end: @@ -638,18 +660,20 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t return status; } -static void wake_thread(int force) +static void _wake_thread(switch_sql_manager_t *sql_manager, int force) { if (force) { - switch_thread_cond_signal(sql_manager.cond); + switch_thread_cond_signal(sql_manager->cond); return; } - if (switch_mutex_trylock(sql_manager.cond_mutex) == SWITCH_STATUS_SUCCESS) { - switch_thread_cond_signal(sql_manager.cond); - switch_mutex_unlock(sql_manager.cond_mutex); + if (switch_mutex_trylock(sql_manager->cond_mutex) == SWITCH_STATUS_SUCCESS) { + switch_thread_cond_signal(sql_manager->cond); + switch_mutex_unlock(sql_manager->cond_mutex); } } +#define wake_thread(f) _wake_thread(&switch_cache_db_sql_manager, f) + /** OMFG you cruel bastards. Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions? @@ -1173,13 +1197,16 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *thread, void *obj) { - int sec = 0, reg_sec = 0;; + int sec = 0, reg_sec = 0; + switch_sql_manager_t *sql_manager; - sql_manager.db_thread_running = 1; + sql_manager = (switch_sql_manager_t*)obj; - while (sql_manager.db_thread_running == 1) { + sql_manager->db_thread_running = 1; + + while (sql_manager->db_thread_running == 1) { if (++sec == SQL_CACHE_TIMEOUT) { - sql_close(switch_epoch_time_now(NULL)); + sql_close(sql_manager, switch_epoch_time_now(NULL)); wake_thread(0); sec = 0; } @@ -1208,44 +1235,47 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, int lc = 0, wrote = 0, do_sleep = 1; uint32_t sanity = 120; int auto_pause = 0; + switch_sql_manager_t *sql_manager; + + sql_manager = (switch_sql_manager_t*)obj; switch_assert(sqlbuf); - while (!sql_manager.event_db) { - if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db) + while (!sql_manager->event_db) { + if (switch_core_db_handle(&sql_manager->event_db) == SWITCH_STATUS_SUCCESS && sql_manager->event_db) break; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n"); switch_yield(500000); sanity--; } - if (!sql_manager.event_db) { + if (!sql_manager->event_db) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db Disabling core sql functionality\n"); return NULL; } - sql_manager.thread_running = 1; + sql_manager->thread_running = 1; - switch_mutex_lock(sql_manager.cond_mutex); + switch_mutex_lock(sql_manager->cond_mutex); - switch (sql_manager.event_db->type) { + switch (switch_cache_db_sql_manager.event_db->type) { case SCDB_TYPE_PGSQL: break; case SCDB_TYPE_ODBC: break; case SCDB_TYPE_CORE_DB: { - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA synchronous=OFF;", NULL); - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA count_changes=OFF;", NULL); - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA temp_store=MEMORY;", NULL); - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA journal_mode=OFF;", NULL); + switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA synchronous=OFF;", NULL); + switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA count_changes=OFF;", NULL); + switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA temp_store=MEMORY;", NULL); + switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA journal_mode=OFF;", NULL); } break; } - while (sql_manager.thread_running == 1) { - if (save_sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || - switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { + while (sql_manager->thread_running == 1) { + if (save_sql || switch_queue_trypop(sql_manager->sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || + switch_queue_trypop(sql_manager->sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { if (save_sql) { sql = save_sql; @@ -1268,8 +1298,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, sql_len = new_mlen; if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]), - switch_queue_size(sql_manager.sql_queue[1])); + "REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager->sql_queue[0]), + switch_queue_size(sql_manager->sql_queue[1])); } if (!(tmp = realloc(sqlbuf, sql_len))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n"); @@ -1280,7 +1310,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } else { if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); + "SAVE %d %d\n", switch_queue_size(sql_manager->sql_queue[0]), switch_queue_size(sql_manager->sql_queue[1])); } save_sql = sql; sql = NULL; @@ -1289,7 +1319,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } } - iterations++; + iterations++; sprintf(sqlbuf + len, "%s;\n", sql); len += newlen; free(sql); @@ -1300,7 +1330,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } } - lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); + lc = switch_queue_size(sql_manager->sql_queue[0]) + switch_queue_size(sql_manager->sql_queue[1]); if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) { @@ -1326,9 +1356,9 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, if (trans && iterations && (iterations > target || !lc)) { if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations); + "RUN %d %d %d\n", switch_queue_size(sql_manager->sql_queue[0]), switch_queue_size(sql_manager->sql_queue[1]), iterations); } - if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { + if (switch_cache_db_persistant_execute_trans(sql_manager->event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); } if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { @@ -1349,10 +1379,10 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, wrote = 1; } - lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); + lc = switch_queue_size(sql_manager->sql_queue[0]) + switch_queue_size(sql_manager->sql_queue[1]); if (!lc) { - switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex); + switch_thread_cond_wait(sql_manager->cond, sql_manager->cond_mutex); } else if (wrote) { if (lc > 2000) { do_sleep = 0; @@ -1364,21 +1394,21 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } - switch_mutex_unlock(sql_manager.cond_mutex); + switch_mutex_unlock(sql_manager->cond_mutex); - while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) { + while (switch_queue_trypop(sql_manager->sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) { free(pop); } - while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { + while (switch_queue_trypop(sql_manager->sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { free(pop); } free(sqlbuf); - sql_manager.thread_running = 0; + sql_manager->thread_running = 0; - switch_cache_db_release_db_handle(&sql_manager.event_db); + switch_cache_db_release_db_handle(&sql_manager->event_db); return NULL; } @@ -1820,9 +1850,9 @@ static void core_event_handler(switch_event_t *event) for (i = 0; i < sql_idx; i++) { if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) { - switch_queue_push(sql_manager.sql_queue[1], sql[i]); + switch_queue_push(switch_cache_db_sql_manager.sql_queue[1], sql[i]); } else { - switch_queue_push(sql_manager.sql_queue[0], sql[i]); + switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql[i]); } sql[i] = NULL; wake_thread(0); @@ -2381,7 +2411,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c user, realm, switch_core_get_switchname()); } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql); if ( !zstr(metadata) ) { sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) " @@ -2413,7 +2443,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql); return SWITCH_STATUS_SUCCESS; } @@ -2433,7 +2463,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_del_registration(const char *user, c sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname()); } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql); return SWITCH_STATUS_SUCCESS; } @@ -2456,35 +2486,33 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force) sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname()); } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql); return SWITCH_STATUS_SUCCESS; } -switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage) +void switch_sqldb_init_sql_manager(switch_sql_manager_t *sql_manager, switch_memory_pool_t *pool, switch_bool_t manage) { - switch_threadattr_t *thd_attr; - uint32_t sanity = 400; - sql_manager.memory_pool = pool; - sql_manager.manage = manage; + sql_manager->memory_pool = pool; + sql_manager->manage = manage; - switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); - switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); - switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); - switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); + switch_mutex_init(&sql_manager->dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool); + switch_mutex_init(&sql_manager->io_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool); + switch_mutex_init(&sql_manager->cond_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool); + switch_mutex_init(&sql_manager->ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool); - switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool); + switch_thread_cond_create(&sql_manager->cond, sql_manager->memory_pool); +} +switch_status_t switch_sqldb_connect(switch_sql_manager_t *sql_manager, switch_memory_pool_t *pool) +{ - - if (!sql_manager.manage) goto skip; - - top: + top: /* Activate SQL database */ - if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) { + if (switch_core_db_handle(&sql_manager->dbh) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) { @@ -2508,8 +2536,25 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Opening DB\n"); + return SWITCH_STATUS_SUCCESS; +} - switch (sql_manager.dbh->type) { +switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage) +{ + switch_threadattr_t *thd_attr; + uint32_t sanity = 400; + switch_sql_manager_t *sql_manager = &switch_cache_db_sql_manager; + + switch_sqldb_init_sql_manager(sql_manager, pool, manage); + + if (manage == SWITCH_FALSE) goto skip; + + top: + if (switch_sqldb_connect(sql_manager, pool) == SWITCH_STATUS_FALSE) { + return SWITCH_STATUS_FALSE; + } + + switch (sql_manager->dbh->type) { case SCDB_TYPE_PGSQL: case SCDB_TYPE_ODBC: if (switch_test_flag((&runtime), SCF_CLEAR_SQL)) { @@ -2520,61 +2565,61 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ for (i = 0; tables[i]; i++) { switch_snprintfv(sql, sizeof(sql), "delete from %q where hostname='%q'", tables[i], hostname); - switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL); + switch_cache_db_execute_sql(sql_manager->dbh, sql, NULL); } } break; case SCDB_TYPE_CORE_DB: { - switch_cache_db_execute_sql(sql_manager.dbh, "drop table channels", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "drop table calls", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "drop view detailed_calls", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "drop view basic_calls", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "drop table interfaces", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "drop table tasks", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA synchronous=OFF;", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA count_changes=OFF;", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA default_cache_size=8000", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA temp_store=MEMORY;", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA journal_mode=OFF;", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "drop table channels", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "drop table calls", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "drop view detailed_calls", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "drop view basic_calls", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "drop table interfaces", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "drop table tasks", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA synchronous=OFF;", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA count_changes=OFF;", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA default_cache_size=8000", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA temp_store=MEMORY;", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA journal_mode=OFF;", NULL); } break; } - switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from aliases", "DROP TABLE aliases", create_alias_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from complete", "DROP TABLE complete", create_complete_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from nat", "DROP TABLE nat", create_nat_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'", + switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from aliases", "DROP TABLE aliases", create_alias_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from complete", "DROP TABLE complete", create_complete_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from nat", "DROP TABLE nat", create_nat_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'", "DROP TABLE registrations", create_registrations_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)"); + switch_cache_db_test_reactive(sql_manager->dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)"); - switch (sql_manager.dbh->type) { + switch (sql_manager->dbh->type) { case SCDB_TYPE_PGSQL: case SCDB_TYPE_ODBC: { char *err; - switch_cache_db_test_reactive(sql_manager.dbh, "select call_uuid, read_bit_rate, sent_callee_name from channels", "DROP TABLE channels", create_channels_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "select * from detailed_calls where sent_callee_name=''", "DROP VIEW detailed_calls", detailed_calls_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "select * from basic_calls where sent_callee_name=''", "DROP VIEW basic_calls", basic_calls_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "select call_uuid from calls", "DROP TABLE calls", create_calls_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select call_uuid, read_bit_rate, sent_callee_name from channels", "DROP TABLE channels", create_channels_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select * from detailed_calls where sent_callee_name=''", "DROP VIEW detailed_calls", detailed_calls_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select * from basic_calls where sent_callee_name=''", "DROP VIEW basic_calls", basic_calls_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select call_uuid from calls", "DROP TABLE calls", create_calls_sql); if (runtime.odbc_dbtype == DBTYPE_DEFAULT) { - switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'", + switch_cache_db_test_reactive(sql_manager->dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'", "DROP TABLE registrations", create_registrations_sql); } else { char *tmp = switch_string_replace(create_registrations_sql, "url TEXT", "url VARCHAR(max)"); - switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'", + switch_cache_db_test_reactive(sql_manager->dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'", "DROP TABLE registrations", tmp); free(tmp); } - switch_cache_db_test_reactive(sql_manager.dbh, "select ikey from interfaces", "DROP TABLE interfaces", create_interfaces_sql); - switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from tasks", "DROP TABLE tasks", create_tasks_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select ikey from interfaces", "DROP TABLE interfaces", create_interfaces_sql); + switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from tasks", "DROP TABLE tasks", create_tasks_sql); if (runtime.odbc_dbtype == DBTYPE_DEFAULT) { - switch_cache_db_execute_sql(sql_manager.dbh, "begin;delete from channels where hostname='';delete from channels where hostname='';commit;", &err); + switch_cache_db_execute_sql(sql_manager->dbh, "begin;delete from channels where hostname='';delete from channels where hostname='';commit;", &err); } else { - switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels where hostname='';delete from channels where hostname='';", &err); + switch_cache_db_execute_sql(sql_manager->dbh, "delete from channels where hostname='';delete from channels where hostname='';", &err); } if (err) { @@ -2582,7 +2627,7 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ runtime.odbc_user = NULL; runtime.odbc_pass = NULL; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Transactions not supported on your DB, disabling non-SQLite support; using SQLite\n"); - switch_cache_db_release_db_handle(&sql_manager.dbh); + switch_cache_db_release_db_handle(&sql_manager->dbh); free(err); goto top; } @@ -2590,49 +2635,49 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ break; case SCDB_TYPE_CORE_DB: { - switch_cache_db_execute_sql(sql_manager.dbh, create_channels_sql, NULL); - switch_cache_db_execute_sql(sql_manager.dbh, create_calls_sql, NULL); - switch_cache_db_execute_sql(sql_manager.dbh, create_interfaces_sql, NULL); - switch_cache_db_execute_sql(sql_manager.dbh, create_tasks_sql, NULL); - switch_cache_db_execute_sql(sql_manager.dbh, detailed_calls_sql, NULL); - switch_cache_db_execute_sql(sql_manager.dbh, basic_calls_sql, NULL); + switch_cache_db_execute_sql(sql_manager->dbh, create_channels_sql, NULL); + switch_cache_db_execute_sql(sql_manager->dbh, create_calls_sql, NULL); + switch_cache_db_execute_sql(sql_manager->dbh, create_interfaces_sql, NULL); + switch_cache_db_execute_sql(sql_manager->dbh, create_tasks_sql, NULL); + switch_cache_db_execute_sql(sql_manager->dbh, detailed_calls_sql, NULL); + switch_cache_db_execute_sql(sql_manager->dbh, basic_calls_sql, NULL); } break; } - switch_cache_db_execute_sql(sql_manager.dbh, "delete from complete where sticky=0", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "delete from aliases where sticky=0", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "delete from nat where sticky=0", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index alias1 on aliases (alias)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index tasks1 on tasks (hostname,task_id)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete1 on complete (a1,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete2 on complete (a2,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete3 on complete (a3,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete4 on complete (a4,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete5 on complete (a5,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete6 on complete (a6,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete7 on complete (a7,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete8 on complete (a8,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete9 on complete (a9,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete10 on complete (a10,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index complete11 on complete (a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index nat_map_port_proto on nat (port,proto,hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index channels1 on channels(hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index calls1 on calls(hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index chidx1 on channels (hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index uuindex on channels (uuid)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index uuindex2 on channels (call_uuid)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index callsidx1 on calls (hostname)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index eruuindex on calls (caller_uuid)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index eeuuindex on calls (callee_uuid)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index eeuuindex2 on calls (call_uuid)", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "create index regindex1 on registrations (reg_user,realm,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "delete from complete where sticky=0", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "delete from aliases where sticky=0", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "delete from nat where sticky=0", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index alias1 on aliases (alias)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index tasks1 on tasks (hostname,task_id)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete1 on complete (a1,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete2 on complete (a2,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete3 on complete (a3,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete4 on complete (a4,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete5 on complete (a5,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete6 on complete (a6,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete7 on complete (a7,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete8 on complete (a8,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete9 on complete (a9,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete10 on complete (a10,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index complete11 on complete (a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index nat_map_port_proto on nat (port,proto,hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index channels1 on channels(hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index calls1 on calls(hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index chidx1 on channels (hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index uuindex on channels (uuid)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index uuindex2 on channels (call_uuid)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index callsidx1 on calls (hostname)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index eruuindex on calls (caller_uuid)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index eeuuindex on calls (callee_uuid)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index eeuuindex2 on calls (call_uuid)", NULL); + switch_cache_db_execute_sql(sql_manager->dbh, "create index regindex1 on registrations (reg_user,realm,hostname)", NULL); skip: - if (sql_manager.manage) { + if (sql_manager->manage) { #ifdef SWITCH_SQL_BIND_EVERY_EVENT switch_event_bind("core_db", SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL); #else @@ -2661,16 +2706,16 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL); #endif - switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); - switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); + switch_queue_create(&sql_manager->sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager->memory_pool); + switch_queue_create(&sql_manager->sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager->memory_pool); - switch_threadattr_create(&thd_attr, sql_manager.memory_pool); + switch_threadattr_create(&thd_attr, sql_manager->memory_pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME); switch_core_sqldb_start_thread(); - switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool); + switch_thread_create(&sql_manager->db_thread, thd_attr, switch_core_sql_db_thread, sql_manager, sql_manager->memory_pool); - while (sql_manager.manage && !sql_manager.thread_running && --sanity) { + while (sql_manager->manage && !sql_manager->thread_running && --sanity) { switch_yield(10000); } } @@ -2678,37 +2723,36 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ } -SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void) +SWITCH_DECLARE(void) switch_sqldb_stop_thread(switch_sql_manager_t *sql_manager) { - switch_mutex_lock(sql_manager.ctl_mutex); - if (sql_manager.thread && sql_manager.thread_running) { + switch_mutex_lock(sql_manager->ctl_mutex); + if (sql_manager->thread && sql_manager->thread_running) { switch_status_t st; - if (sql_manager.manage) { - switch_queue_push(sql_manager.sql_queue[0], NULL); - switch_queue_push(sql_manager.sql_queue[1], NULL); + if (sql_manager->manage) { + switch_queue_push(sql_manager->sql_queue[0], NULL); + switch_queue_push(sql_manager->sql_queue[1], NULL); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n"); wake_thread(0); - sql_manager.thread_running = -1; - switch_thread_join(&st, sql_manager.thread); - sql_manager.thread = NULL; - switch_cache_db_release_db_handle(&sql_manager.dbh); - sql_manager.dbh = NULL; + sql_manager->thread_running = -1; + switch_thread_join(&st, sql_manager->thread); + sql_manager->thread = NULL; + switch_cache_db_release_db_handle(&sql_manager->dbh); + sql_manager->dbh = NULL; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n"); } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is not running\n"); } - switch_mutex_unlock(sql_manager.ctl_mutex); + switch_mutex_unlock(sql_manager->ctl_mutex); } -SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) + +void switch_core_recovery_create_indices(void) { switch_cache_db_handle_t *dbh; - switch_mutex_lock(sql_manager.ctl_mutex); - if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); @@ -2728,14 +2772,34 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) switch_cache_db_release_db_handle(&dbh); } +} - if (sql_manager.manage) { + +SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) +{ + char *post_connect_sql[2]; + post_connect_sql[0] = "delete from channels"; + post_connect_sql[1] = "delete from calls"; + + switch_core_recovery_create_indices(); + switch_sqldb_start_thread(&switch_cache_db_sql_manager, switch_core_sql_thread, post_connect_sql, 2); + +} + +SWITCH_DECLARE(void) switch_sqldb_start_thread(switch_sql_manager_t *sql_manager, void *(SWITCH_THREAD_FUNC * func) (switch_thread_t *, void *), + char *post_connect_sql[], int argc) +{ + int i = 0; + + switch_mutex_lock(sql_manager->ctl_mutex); + + if (sql_manager->manage) { top: - if (!sql_manager.dbh) { + if (!sql_manager->dbh) { /* Activate SQL database */ - if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) { + if (switch_core_db_handle(&sql_manager->dbh) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) { @@ -2749,7 +2813,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) runtime.odbc_pass = NULL; runtime.odbc_dbtype = DBTYPE_DEFAULT; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n"); - sql_manager.dbh = NULL; + sql_manager->dbh = NULL; goto top; } @@ -2758,18 +2822,21 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) goto end; } - switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "delete from calls", NULL); + if (argc && post_connect_sql) { + for ( i = 0; i < argc; i++) { + switch_cache_db_execute_sql(sql_manager->dbh, post_connect_sql[i], NULL); + } + } } - if (!sql_manager.thread) { + if (!sql_manager->thread) { switch_threadattr_t *thd_attr; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n"); - switch_threadattr_create(&thd_attr, sql_manager.memory_pool); + switch_threadattr_create(&thd_attr, sql_manager->memory_pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME); - switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool); + switch_thread_create(&sql_manager->thread, thd_attr, func, sql_manager, sql_manager->memory_pool); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is already running\n"); } @@ -2779,28 +2846,38 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) end: - switch_mutex_unlock(sql_manager.ctl_mutex); + switch_mutex_unlock(sql_manager->ctl_mutex); } -void switch_core_sqldb_stop(void) +void switch_sqldb_stop(switch_sql_manager_t *sql_manager) { switch_status_t st; switch_event_unbind_callback(core_event_handler); - switch_core_sqldb_stop_thread(); + switch_sqldb_stop_thread(sql_manager); - if (sql_manager.db_thread && sql_manager.db_thread_running) { - sql_manager.db_thread_running = -1; - switch_thread_join(&st, sql_manager.db_thread); + if (sql_manager->db_thread && sql_manager->db_thread_running) { + sql_manager->db_thread_running = -1; + switch_thread_join(&st, sql_manager->db_thread); } - switch_cache_db_flush_handles(); - sql_close(0); + switch_cache_db_flush_handles(sql_manager); + sql_close(sql_manager, 0); +} + +void switch_core_sqldb_stop() +{ + switch_sqldb_stop(&switch_cache_db_sql_manager); } SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) +{ + _switch_cache_db_status(&switch_cache_db_sql_manager, stream); +} + +SWITCH_DECLARE(void) _switch_cache_db_status(switch_sql_manager_t *sql_manager, switch_stream_handle_t *stream) { /* return some status info suitable for the cli */ switch_cache_db_handle_t *dbh = NULL; @@ -2811,11 +2888,16 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) char *pos2 = NULL; int count = 0, used = 0; - switch_mutex_lock(sql_manager.dbh_mutex); + switch_mutex_lock(sql_manager->dbh_mutex); - for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) { - char *needle = "pass=\""; + for (dbh = sql_manager->handle_pool; dbh; dbh = dbh->next) { + char *needles[3]; time_t diff = 0; + int i = 0; + + needles[0] = "pass=\""; + needles[1] = "password="; + needles[2] = "password='"; diff = now - dbh->last_used; @@ -2828,11 +2910,26 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) /* sanitize password */ memset(cleankey_str, 0, sizeof(cleankey_str)); - pos1 = strstr(dbh->name, needle) + strlen(needle); - pos2 = strstr(pos1, "\""); - strncpy(cleankey_str, dbh->name, pos1 - dbh->name); - strcpy(&cleankey_str[pos1 - dbh->name], pos2); - + for (i = 0; i < 3; i++) { + if((pos1 = strstr(dbh->name, needles[i]))) { + pos1 += strlen(needles[i]); + + if (!(pos2 = strstr(pos1, "\""))) { + if (!(pos2 = strstr(pos1, "'"))) { + if (!(pos2 = strstr(pos1, " "))) { + pos2 = pos1 + strlen(pos1); + } + } + } + strncpy(cleankey_str, dbh->name, pos1 - dbh->name); + strcpy(&cleankey_str[pos1 - dbh->name], pos2); + break; + } + } + if (i == 3) { + strncpy(cleankey_str, dbh->name, strlen(dbh->name)); + } + count++; if (dbh->use_count) { @@ -2850,7 +2947,7 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) stream->write_function(stream, "%d total. %d in use.\n", count, used); - switch_mutex_unlock(sql_manager.dbh_mutex); + switch_mutex_unlock(sql_manager->dbh_mutex); } SWITCH_DECLARE(char*)switch_sql_concat(void)