diff --git a/conf/vanilla/autoload_configs/amqp.conf.xml b/conf/vanilla/autoload_configs/amqp.conf.xml index ccc81f51fd..0d139169a9 100644 --- a/conf/vanilla/autoload_configs/amqp.conf.xml +++ b/conf/vanilla/autoload_configs/amqp.conf.xml @@ -25,10 +25,16 @@ + + + + diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h index 07528b234e..f651a1a89b 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.h +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h @@ -104,6 +104,7 @@ typedef struct { int reconnect_interval_ms; int circuit_breaker_ms; switch_time_t circuit_breaker_reset_time; + switch_bool_t enable_fallback_format_fields; switch_bool_t running; switch_memory_pool_t *pool; @@ -156,7 +157,8 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void /* producer */ void mod_amqp_producer_event_handler(switch_event_t* evt); -switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],switch_event_t* evt, char* routingKeyEventHeaderNames[]); +switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], + switch_event_t* evt, char* routingKeyEventHeaderNames[]); switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile); switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg); void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data); diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c index 067bb57d7b..0108bdaa79 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c @@ -45,7 +45,8 @@ void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg) switch_safe_free(*msg); } -switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_event_t* evt, char* routingKeyEventHeaderNames[]) +switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], + switch_event_t* evt, char* routingKeyEventHeaderNames[]) { int i = 0, idx = 0; @@ -54,16 +55,41 @@ switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_K if (idx) { routingKey[idx++] = '.'; } - if (routingKeyEventHeaderNames[i][0] == '#') { - strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx); - } else { - char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]); - strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx); + if ( profile->enable_fallback_format_fields) { + int count = 0, x = 0; + char *argv[10]; - /* Replace dots with underscores so that the routing key does not get corrupted */ - switch_replace_char(routingKey + idx, '.', '_', 0); + count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0]))); + for( x = 0; x < count; x++) { + if (argv[x][0] == '#') { + strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx); + break; + } else { + char *value = switch_event_get_header(evt, argv[x]); + + if (!value) { + continue; + } + + strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx); + + /* Replace dots with underscores so that the routing key does not get corrupted */ + switch_replace_char(routingKey + idx, '.', '_', 0); + } + } + idx += strlen(routingKey + idx); + } else { + if (routingKeyEventHeaderNames[i][0] == '#') { + strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx); + } else { + char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]); + strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx); + + /* Replace dots with underscores so that the routing key does not get corrupted */ + switch_replace_char(routingKey + idx, '.', '_', 0); + } + idx += strlen(routingKey + idx); } - idx += strlen(routingKey + idx); } } return SWITCH_STATUS_SUCCESS; @@ -97,7 +123,7 @@ void mod_amqp_producer_event_handler(switch_event_t* evt) switch_malloc(amqp_message, sizeof(mod_amqp_message_t)); switch_event_serialize_json(evt, &amqp_message->pjson); - mod_amqp_producer_routing_key(amqp_message->routing_key, evt, profile->format_fields); + mod_amqp_producer_routing_key(profile, amqp_message->routing_key, evt, profile->format_fields); /* Queue the message to be sent by the worker thread, errors are reported only once per circuit breaker interval */ if (switch_queue_trypush(profile->send_queue, amqp_message) != SWITCH_STATUS_SUCCESS) { @@ -221,6 +247,11 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) if ( interval && interval > 0 ) { profile->send_queue_size = interval; } + } else if (!strncmp(var, "enable_fallback_format_fields", 29)) { + int interval = atoi(val); + if ( interval && interval > 0 ) { + profile->enable_fallback_format_fields = 1; + } } else if (!strncmp(var, "exchange", 8)) { exchange = switch_core_strdup(profile->pool, "TAP.Events"); } else if (!strncmp(var, "format_fields", 13)) {