mod_cdr_pg_csv now spools the attempted SQL to disk if DB insert fails, instead of writing CSV. This should make it simpler to import any failed inserts.

This commit is contained in:
Daniel Swarbrick 2010-12-31 20:54:20 +01:00
parent 9ab7473cb9
commit 2be3175ae9
2 changed files with 39 additions and 42 deletions

View File

@ -53,7 +53,7 @@ typedef struct cdr_fd cdr_fd_t;
const char *default_template = const char *default_template =
"\"${local_ip_v4}\",\"${caller_id_name}\",\"${caller_id_number}\",\"${destination_number}\",\"${context}\",\"${start_stamp}\"," "\"${local_ip_v4}\",\"${caller_id_name}\",\"${caller_id_number}\",\"${destination_number}\",\"${context}\",\"${start_stamp}\","
"\"${answer_stamp}\",\"${end_stamp}\",\"${duration}\",\"${billsec}\",\"${hangup_cause}\",\"${uuid}\",\"${bleg_uuid}\",\"${accountcode}\"," "\"${answer_stamp}\",\"${end_stamp}\",\"${duration}\",\"${billsec}\",\"${hangup_cause}\",\"${uuid}\",\"${bleg_uuid}\",\"${accountcode}\","
"\"${read_codec}\",\"${write_codec}\"\n"; "\"${read_codec}\",\"${write_codec}\"";
static struct { static struct {
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
@ -184,17 +184,17 @@ static void write_cdr(const char *path, const char *log_line)
switch_mutex_unlock(fd->mutex); switch_mutex_unlock(fd->mutex);
} }
static switch_status_t save_cdr(const char * const table, const char * const template, const char * const cdr) static switch_status_t save_cdr(const char * const template, const char * const cdr, const char * const log_dir)
{ {
char *columns, *values; char *columns, *values;
char *p, *q; char *p, *q;
unsigned vlen; unsigned vlen;
char *query;
PGresult *res;
char *nullValues, *temp, *tp; char *nullValues, *temp, *tp;
int nullCounter = 0, charCounter = 0; int nullCounter = 0, charCounter = 0;
char *sql = NULL, *path = NULL;
PGresult *res;
if (!table || !*table || !template || !*template || !cdr || !*cdr) { if (!template || !*template || !cdr || !*cdr) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Bad parameter\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Bad parameter\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
@ -311,13 +311,13 @@ static switch_status_t save_cdr(const char * const table, const char * const tem
//----------------------------- END_OF_PATCH ------------------------------- //----------------------------- END_OF_PATCH -------------------------------
query = switch_mprintf("INSERT INTO %s (%s) VALUES (%s);", table, columns, values); sql = switch_mprintf("INSERT INTO %s (%s) VALUES (%s);\n", globals.db_table, columns, values);
assert(query); assert(sql);
free(columns); free(columns);
free(values); free(values);
if (globals.debug) { if (globals.debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Query: \"%s\"\n", query); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Query: \"%s\"\n", sql);
} }
switch_mutex_lock(globals.db_mutex); switch_mutex_lock(globals.db_mutex);
@ -330,29 +330,40 @@ static switch_status_t save_cdr(const char * const table, const char * const tem
globals.db_online = 1; globals.db_online = 1;
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection to database failed: %s", PQerrorMessage(globals.db_connection)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection to database failed: %s", PQerrorMessage(globals.db_connection));
PQfinish(globals.db_connection); goto error;
globals.db_online = 0;
switch_mutex_unlock(globals.db_mutex);
free(query);
return SWITCH_STATUS_FALSE;
} }
res = PQexec(globals.db_connection, query); res = PQexec(globals.db_connection, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK) { if (PQresultStatus(res) != PGRES_COMMAND_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "INSERT command failed: %s", PQresultErrorMessage(res)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "INSERT command failed: %s", PQresultErrorMessage(res));
PQclear(res); PQclear(res);
PQfinish(globals.db_connection); goto error;
globals.db_online = 0;
switch_mutex_unlock(globals.db_mutex);
free(query);
return SWITCH_STATUS_FALSE;
} }
PQclear(res); PQclear(res);
free(query); free(sql);
switch_mutex_unlock(globals.db_mutex); switch_mutex_unlock(globals.db_mutex);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
error:
PQfinish(globals.db_connection);
globals.db_online = 0;
switch_mutex_unlock(globals.db_mutex);
/* SQL INSERT failed for whatever reason. Spool the attempted query to disk */
path = switch_mprintf("%s%scdr-spool.sql", log_dir, SWITCH_PATH_SEPARATOR);
assert(path);
write_cdr(path, sql);
free(path);
free(sql);
return SWITCH_STATUS_FALSE;
} }
static switch_status_t my_on_reporting(switch_core_session_t *session) static switch_status_t my_on_reporting(switch_core_session_t *session)
@ -360,8 +371,7 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
switch_status_t status = SWITCH_STATUS_SUCCESS; switch_status_t status = SWITCH_STATUS_SUCCESS;
const char *log_dir = NULL, *template_str = NULL; const char *log_dir = NULL, *template_str = NULL;
char *log_line, *path = NULL; char *expanded_vars = NULL;
int saved = 0;
if (globals.shutdown) { if (globals.shutdown) {
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
@ -407,24 +417,17 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
template_str = default_template; template_str = default_template;
} }
log_line = switch_channel_expand_variables(channel, template_str); expanded_vars = switch_channel_expand_variables(channel, template_str);
if (!log_line) { if (!expanded_vars) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error expanding CDR variables.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error expanding CDR variables.\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
saved = save_cdr(globals.db_table, template_str, log_line); save_cdr(template_str, expanded_vars, log_dir);
if (!saved) { if (expanded_vars != template_str) {
path = switch_mprintf("%s%sMaster.csv", log_dir, SWITCH_PATH_SEPARATOR); free(expanded_vars);
assert(path);
write_cdr(path, log_line);
free(path);
}
if (log_line != template_str) {
free(log_line);
} }
return status; return status;
@ -535,13 +538,7 @@ static switch_status_t load_config(switch_memory_pool_t *pool)
char *var = (char *) switch_xml_attr(param, "name"); char *var = (char *) switch_xml_attr(param, "name");
if (var) { if (var) {
char *tpl; char *tpl;
size_t len = strlen(param->txt) + 2;
if (end_of(param->txt) != '\n') {
tpl = switch_core_alloc(pool, len);
switch_snprintf(tpl, len, "%s\n", param->txt);
} else {
tpl = switch_core_strdup(pool, param->txt); tpl = switch_core_strdup(pool, param->txt);
}
switch_core_hash_insert(globals.template_hash, var, tpl); switch_core_hash_insert(globals.template_hash, var, tpl);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Adding template %s.\n", var); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Adding template %s.\n", var);

View File

@ -113,7 +113,7 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
switch_status_t status = SWITCH_STATUS_SUCCESS; switch_status_t status = SWITCH_STATUS_SUCCESS;
const char *template_str = NULL; const char *template_str = NULL;
char *expanded_vars, *sql = NULL; char *expanded_vars = NULL, *sql = NULL;
if (globals.shutdown) { if (globals.shutdown) {
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;