FS-7060 expanded configuration for amqp command configuration

This commit is contained in:
William King 2015-04-07 16:11:34 -07:00
parent 7b6a9efcaa
commit d55c4a053b
4 changed files with 61 additions and 9 deletions

View File

@ -48,8 +48,8 @@
</connection>
</connections>
<params>
<param name="eventExchange" value="TAP.Events"/>
<param name="eventExchangetype" value="topic"/>
<param name="exchange" value="TAP.Commands"/>
<param name="reconnect_interval_ms" value="1000"/>
</params>
</profile>
</commands>

View File

@ -114,15 +114,8 @@ typedef struct {
char *name;
char *exchange;
char *exchange_type;
char *binding_key;
/* Array to store the possible event subscriptions */
char *event_filter;
unsigned int number_of_event_filters;
switch_event_node_t *event_nodes[SWITCH_EVENT_ALL];
switch_event_types_t event_ids[SWITCH_EVENT_ALL];
/* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */
mod_amqp_connection_t *conn_root;
mod_amqp_connection_t *conn_active;

View File

@ -83,8 +83,10 @@ switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **prof)
switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
{
mod_amqp_command_profile_t *profile = NULL;
switch_xml_t params, param, connections, connection;
switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool;
char *exchange = NULL;
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
goto err;
@ -95,6 +97,61 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
profile->pool = pool;
profile->name = switch_core_strdup(profile->pool, name);
profile->running = 1;
profile->reconnect_interval_ms = 1000;
if ((params = switch_xml_child(cfg, "params")) != NULL) {
for (param = switch_xml_child(params, "param"); param; param = param->next) {
char *var = (char *) switch_xml_attr_soft(param, "name");
char *val = (char *) switch_xml_attr_soft(param, "value");
if (!var) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", profile->name);
continue;
}
if (!val) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var);
continue;
}
if (!strncmp(var, "reconnect_interval_ms", 21)) {
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->reconnect_interval_ms = interval;
}
} else if (!strncmp(var, "exchange", 8)) {
exchange = switch_core_strdup(profile->pool, "TAP.Commands");
}
}
}
/* Handle defaults of string types */
profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Commands");
if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
if ( ! profile->conn_root ) { /* Handle first root node */
if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
/* Handle connection create failure */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
continue;
}
profile->conn_active = profile->conn_root;
} else {
if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
/* Handle connection create failure */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
continue;
}
profile->conn_active = profile->conn_active->next;
}
}
}
profile->conn_active = NULL;
if ( mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name);
}
/* Start the worker threads */
switch_threadattr_create(&thd_attr, profile->pool);

View File

@ -221,6 +221,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
if ( interval && interval > 0 ) {
profile->send_queue_size = interval;
}
} else if (!strncmp(var, "exchange", 8)) {
exchange = switch_core_strdup(profile->pool, "TAP.Events");
} else if (!strncmp(var, "format_fields", 13)) {
int size = 0;
if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {