Add error handling to sql queue manager callback functionality and fix spelling.

This commit is contained in:
Eliot Gable 2014-03-14 15:54:05 +00:00
parent 8c41b4f42c
commit 5b1ab59f00
3 changed files with 201 additions and 14 deletions

View File

@ -2448,6 +2448,20 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh, const char *sql,
switch_core_db_callback_func_t callback, void *pdata, char **err);
/*!
\brief Executes the sql and uses callback for row-by-row processing
\param [in] dbh The handle
\param [in] sql - sql to run
\param [in] callback - function pointer to callback
\param [in] err_callback - function pointer to callback when error occurs
\param [in] pdata - data to pass to callback
\param [out] err - Error if it exists
*/
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t *dbh, const char *sql,
switch_core_db_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback,
void *pdata, char **err);
/*!
\brief Get the affected rows of the last performed query
\param [in] dbh The handle
@ -2562,11 +2576,21 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_m
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err);
SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm,
const char *sql, switch_core_db_callback_func_t callback, void *pdata);
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_callback_func_t callback,
void *pdata);
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback,
void *pdata);
SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm,
const char *sql, switch_core_db_event_callback_func_t callback, void *pdata);
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_event_callback_func_t callback,
void *pdata);
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_event_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback,
void *pdata);
SWITCH_DECLARE(pid_t) switch_fork(void);

View File

@ -54,6 +54,7 @@ SWITCH_BEGIN_EXTERN_C
typedef struct sqlite3_stmt switch_core_db_stmt_t;
typedef int (*switch_core_db_callback_func_t) (void *pArg, int argc, char **argv, char **columnNames);
typedef int (*switch_core_db_err_callback_func_t) (void *pArg, const char *errmsg);
/*
** These are special value for the destructor that is passed in as the

View File

@ -1045,7 +1045,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switc
switch_status_t status = SWITCH_STATUS_FALSE;
char *errmsg = NULL;
switch_mutex_t *io_mutex = dbh->io_mutex;
struct helper h;
struct helper h = {0};
if (err) {
@ -1092,6 +1092,72 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switc
return status;
}
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback_err(switch_cache_db_handle_t *dbh, const char *sql,
switch_core_db_event_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback,
void *pdata, char **err)
{
switch_status_t status = SWITCH_STATUS_FALSE;
char *errmsg = NULL;
switch_mutex_t *io_mutex = dbh->io_mutex;
struct helper h;
if (err) {
*err = NULL;
}
if (io_mutex) switch_mutex_lock(io_mutex);
h.callback = callback;
h.pdata = pdata;
switch (dbh->type) {
case SCDB_TYPE_PGSQL:
{
status = switch_pgsql_handle_callback_exec(dbh->native_handle.pgsql_dbh, sql, helper_callback, &h, err);
if (err && *err) {
(*err_callback)(pdata, (const char*)*err);
}
}
break;
case SCDB_TYPE_ODBC:
{
status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, helper_callback, &h, err);
if (err && *err) {
(*err_callback)(pdata, (const char*)*err);
}
}
break;
case SCDB_TYPE_CORE_DB:
{
int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh, sql, helper_callback, &h, &errmsg);
if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
status = SWITCH_STATUS_SUCCESS;
}
if (errmsg) {
dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
if (!strstr(errmsg, "query abort")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
}
}
if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) {
(*err_callback)(pdata, errmsg);
}
if (errmsg) {
switch_core_db_free(errmsg);
}
}
break;
}
if (io_mutex) switch_mutex_unlock(io_mutex);
return status;
}
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh,
const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err)
{
@ -1141,6 +1207,67 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach
return status;
}
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t *dbh, const char *sql,
switch_core_db_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback, void *pdata, char **err)
{
switch_status_t status = SWITCH_STATUS_FALSE;
char *errmsg = NULL;
switch_mutex_t *io_mutex = dbh->io_mutex;
if (err) {
*err = NULL;
}
if (io_mutex) switch_mutex_lock(io_mutex);
switch (dbh->type) {
case SCDB_TYPE_PGSQL:
{
status = switch_pgsql_handle_callback_exec(dbh->native_handle.pgsql_dbh, sql, callback, pdata, err);
if (err && *err) {
(*err_callback)(pdata, (const char*)*err);
}
}
break;
case SCDB_TYPE_ODBC:
{
status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err);
if (err && *err) {
(*err_callback)(pdata, (const char*)*err);
}
}
break;
case SCDB_TYPE_CORE_DB:
{
int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh, sql, callback, pdata, &errmsg);
if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
status = SWITCH_STATUS_SUCCESS;
}
if (errmsg) {
dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
if (!strstr(errmsg, "query abort")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
}
}
if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) {
(*err_callback)(pdata, errmsg);
}
if (errmsg) {
switch_core_db_free(errmsg);
}
}
break;
}
if (io_mutex) switch_mutex_unlock(io_mutex);
return status;
}
SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh,
const char *test_sql, const char *drop_sql, const char *reactive_sql)
{
@ -1312,7 +1439,9 @@ struct db_job {
switch_sql_queue_manager_t *qm;
char *sql;
switch_core_db_callback_func_t callback;
switch_core_db_err_callback_func_t err_callback;
switch_core_db_event_callback_func_t event_callback;
switch_core_db_err_callback_func_t event_err_callback;
void *pdata;
int event;
switch_memory_pool_t *pool;
@ -1331,10 +1460,14 @@ static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *ob
return NULL;
}
if (job->callback) {
if (job->callback && !job->err_callback) {
switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err);
} else if (job->event_callback) {
} else if (job->callback && job->err_callback) {
switch_cache_db_execute_sql_callback_err(dbh, job->sql, job->callback, job->err_callback, job->pdata, &err);
} else if (job->event_callback && !job->event_err_callback) {
switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err);
} else if (job->event_callback && job->event_err_callback) {
switch_cache_db_execute_sql_event_callback_err(dbh, job->sql, job->event_callback, job->event_err_callback, job->pdata, &err);
}
if (err) {
@ -1351,8 +1484,12 @@ static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *ob
return NULL;
}
static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_callback_func_t callback, switch_core_db_event_callback_func_t event_callback, void *pdata)
static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback,
switch_core_db_event_callback_func_t event_callback,
switch_core_db_err_callback_func_t event_err_callback,
void *pdata)
{
switch_memory_pool_t *pool;
switch_thread_data_t *td;
@ -1370,8 +1507,10 @@ static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char
if (callback) {
job->callback = callback;
job->err_callback = err_callback;
} else if (event_callback) {
job->event_callback = event_callback;
job->event_err_callback = event_err_callback;
}
job->pdata = pdata;
@ -1381,22 +1520,45 @@ static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char
}
SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm,
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t *qm,
const char *sql, switch_core_db_callback_func_t callback, void *pdata)
{
switch_thread_data_t *td;
if ((td = new_job(qm, sql, callback, NULL, pdata))) {
if ((td = new_job(qm, sql, callback, NULL, NULL, NULL, pdata))) {
switch_thread_pool_launch_thread(&td);
}
}
SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm,
const char *sql, switch_core_db_event_callback_func_t callback, void *pdata)
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback, void *pdata)
{
switch_thread_data_t *td;
if ((td = new_job(qm, sql, NULL, callback, pdata))) {
if ((td = new_job(qm, sql, callback, err_callback, NULL, NULL, pdata))) {
switch_thread_pool_launch_thread(&td);
}
}
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t *qm,
const char *sql, switch_core_db_event_callback_func_t callback, void *pdata)
{
switch_thread_data_t *td;
if ((td = new_job(qm, sql, NULL, NULL, callback, NULL, pdata))) {
switch_thread_pool_launch_thread(&td);
}
}
SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_event_callback_func_t callback,
switch_core_db_err_callback_func_t err_callback,
void *pdata)
{
switch_thread_data_t *td;
if ((td = new_job(qm, sql, NULL, NULL, callback, err_callback, pdata))) {
switch_thread_pool_launch_thread(&td);
}
}