diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 01f4a87111..7b055025c7 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -2448,6 +2448,20 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh, const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err); +/*! + \brief Executes the sql and uses callback for row-by-row processing + \param [in] dbh The handle + \param [in] sql - sql to run + \param [in] callback - function pointer to callback + \param [in] err_callback - function pointer to callback when error occurs + \param [in] pdata - data to pass to callback + \param [out] err - Error if it exists +*/ +SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t *dbh, const char *sql, + switch_core_db_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, + void *pdata, char **err); + /*! \brief Get the affected rows of the last performed query \param [in] dbh The handle @@ -2562,11 +2576,21 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_m SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh, const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err); -SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm, - const char *sql, switch_core_db_callback_func_t callback, void *pdata); +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_callback_func_t callback, + void *pdata); +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, + void *pdata); -SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm, - const char *sql, switch_core_db_event_callback_func_t callback, void *pdata); +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_event_callback_func_t callback, + void *pdata); +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_event_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, + void *pdata); SWITCH_DECLARE(pid_t) switch_fork(void); diff --git a/src/include/switch_core_db.h b/src/include/switch_core_db.h index 53e0625b07..28f23e6480 100644 --- a/src/include/switch_core_db.h +++ b/src/include/switch_core_db.h @@ -54,6 +54,7 @@ SWITCH_BEGIN_EXTERN_C typedef struct sqlite3_stmt switch_core_db_stmt_t; typedef int (*switch_core_db_callback_func_t) (void *pArg, int argc, char **argv, char **columnNames); +typedef int (*switch_core_db_err_callback_func_t) (void *pArg, const char *errmsg); /* ** These are special value for the destructor that is passed in as the diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 0195e4ac6e..5b85523657 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -1045,7 +1045,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switc switch_status_t status = SWITCH_STATUS_FALSE; char *errmsg = NULL; switch_mutex_t *io_mutex = dbh->io_mutex; - struct helper h; + struct helper h = {0}; if (err) { @@ -1092,6 +1092,72 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switc return status; } +SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback_err(switch_cache_db_handle_t *dbh, const char *sql, + switch_core_db_event_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, + void *pdata, char **err) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + char *errmsg = NULL; + switch_mutex_t *io_mutex = dbh->io_mutex; + struct helper h; + + + if (err) { + *err = NULL; + } + + if (io_mutex) switch_mutex_lock(io_mutex); + + h.callback = callback; + h.pdata = pdata; + + switch (dbh->type) { + case SCDB_TYPE_PGSQL: + { + status = switch_pgsql_handle_callback_exec(dbh->native_handle.pgsql_dbh, sql, helper_callback, &h, err); + if (err && *err) { + (*err_callback)(pdata, (const char*)*err); + } + } + break; + case SCDB_TYPE_ODBC: + { + status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, helper_callback, &h, err); + if (err && *err) { + (*err_callback)(pdata, (const char*)*err); + } + } + break; + case SCDB_TYPE_CORE_DB: + { + int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh, sql, helper_callback, &h, &errmsg); + + if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) { + status = SWITCH_STATUS_SUCCESS; + } + + if (errmsg) { + dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2); + if (!strstr(errmsg, "query abort")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg); + } + } + if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) { + (*err_callback)(pdata, errmsg); + } + if (errmsg) { + switch_core_db_free(errmsg); + } + } + break; + } + + if (io_mutex) switch_mutex_unlock(io_mutex); + + return status; +} + SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh, const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err) { @@ -1141,6 +1207,67 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach return status; } +SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t *dbh, const char *sql, + switch_core_db_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, void *pdata, char **err) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + char *errmsg = NULL; + switch_mutex_t *io_mutex = dbh->io_mutex; + + if (err) { + *err = NULL; + } + + if (io_mutex) switch_mutex_lock(io_mutex); + + + switch (dbh->type) { + case SCDB_TYPE_PGSQL: + { + status = switch_pgsql_handle_callback_exec(dbh->native_handle.pgsql_dbh, sql, callback, pdata, err); + if (err && *err) { + (*err_callback)(pdata, (const char*)*err); + } + } + break; + case SCDB_TYPE_ODBC: + { + status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err); + if (err && *err) { + (*err_callback)(pdata, (const char*)*err); + } + } + break; + case SCDB_TYPE_CORE_DB: + { + int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh, sql, callback, pdata, &errmsg); + + if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) { + status = SWITCH_STATUS_SUCCESS; + } + + if (errmsg) { + dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2); + if (!strstr(errmsg, "query abort")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg); + } + } + if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) { + (*err_callback)(pdata, errmsg); + } + if (errmsg) { + switch_core_db_free(errmsg); + } + } + break; + } + + if (io_mutex) switch_mutex_unlock(io_mutex); + + return status; +} + SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh, const char *test_sql, const char *drop_sql, const char *reactive_sql) { @@ -1312,7 +1439,9 @@ struct db_job { switch_sql_queue_manager_t *qm; char *sql; switch_core_db_callback_func_t callback; + switch_core_db_err_callback_func_t err_callback; switch_core_db_event_callback_func_t event_callback; + switch_core_db_err_callback_func_t event_err_callback; void *pdata; int event; switch_memory_pool_t *pool; @@ -1331,10 +1460,14 @@ static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *ob return NULL; } - if (job->callback) { + if (job->callback && !job->err_callback) { switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err); - } else if (job->event_callback) { + } else if (job->callback && job->err_callback) { + switch_cache_db_execute_sql_callback_err(dbh, job->sql, job->callback, job->err_callback, job->pdata, &err); + } else if (job->event_callback && !job->event_err_callback) { switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err); + } else if (job->event_callback && job->event_err_callback) { + switch_cache_db_execute_sql_event_callback_err(dbh, job->sql, job->event_callback, job->event_err_callback, job->pdata, &err); } if (err) { @@ -1351,8 +1484,12 @@ static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *ob return NULL; } -static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql, - switch_core_db_callback_func_t callback, switch_core_db_event_callback_func_t event_callback, void *pdata) +static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, + switch_core_db_event_callback_func_t event_callback, + switch_core_db_err_callback_func_t event_err_callback, + void *pdata) { switch_memory_pool_t *pool; switch_thread_data_t *td; @@ -1370,8 +1507,10 @@ static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char if (callback) { job->callback = callback; + job->err_callback = err_callback; } else if (event_callback) { job->event_callback = event_callback; + job->event_err_callback = event_err_callback; } job->pdata = pdata; @@ -1381,22 +1520,45 @@ static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char } -SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm, +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t *qm, const char *sql, switch_core_db_callback_func_t callback, void *pdata) { switch_thread_data_t *td; - if ((td = new_job(qm, sql, callback, NULL, pdata))) { + if ((td = new_job(qm, sql, callback, NULL, NULL, NULL, pdata))) { switch_thread_pool_launch_thread(&td); } } -SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm, - const char *sql, switch_core_db_event_callback_func_t callback, void *pdata) +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, void *pdata) { switch_thread_data_t *td; - if ((td = new_job(qm, sql, NULL, callback, pdata))) { + if ((td = new_job(qm, sql, callback, err_callback, NULL, NULL, pdata))) { + switch_thread_pool_launch_thread(&td); + } +} + +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t *qm, + const char *sql, switch_core_db_event_callback_func_t callback, void *pdata) +{ + + switch_thread_data_t *td; + if ((td = new_job(qm, sql, NULL, NULL, callback, NULL, pdata))) { + switch_thread_pool_launch_thread(&td); + } +} + +SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_event_callback_func_t callback, + switch_core_db_err_callback_func_t err_callback, + void *pdata) +{ + + switch_thread_data_t *td; + if ((td = new_job(qm, sql, NULL, NULL, callback, err_callback, pdata))) { switch_thread_pool_launch_thread(&td); } }