diff --git a/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp index c8975ebe5d..b85acbf0e0 100644 --- a/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp +++ b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp @@ -4,24 +4,17 @@ #include #include +#include "mod_event_zmq.h" + namespace mod_event_zmq { -SWITCH_MODULE_LOAD_FUNCTION(load); -SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown); -SWITCH_MODULE_RUNTIME_FUNCTION(runtime); - -extern "C" { -SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime); -}; - // Handles publishing events out to clients class ZmqEventPublisher { public: - ZmqEventPublisher() : - context(1), - event_publisher(context, ZMQ_PUB) + ZmqEventPublisher(zmq::context_t &context) : + _publisher(context, ZMQ_PUB) { - event_publisher.bind("tcp://*:5556"); + _publisher.bind("tcp://*:5556"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n"); } @@ -35,7 +28,7 @@ public: zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL); // Send the message - event_publisher.send(msg); + _publisher.send(msg); } private: @@ -43,18 +36,34 @@ private: free (data); } - zmq::context_t context; - zmq::socket_t event_publisher; + zmq::socket_t _publisher; +}; + +class char_msg : public zmq::message_t { +public: + char_msg() : zmq::message_t(sizeof(char)) { } + char_msg(char data) : zmq::message_t(sizeof(char)) { + *char_data() = data; + } + + char* char_data() { + return static_cast(this->data()); + } }; // Handles global inititalization and teardown of the module class ZmqModule { public: ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) : - _running(false) { + _context(1), _term_rep(_context, ZMQ_REP), _term_req(_context, ZMQ_REQ), _publisher(_context) { + + // Set up the term messaging connection + _term_rep.bind(TERM_URI); + _term_req.connect(TERM_URI); + // Subscribe to all switch events of any subclass // Store a pointer to ourself in the user data - if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node) + if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, static_cast(&_publisher), &_node) != SWITCH_STATUS_SUCCESS) { throw std::runtime_error("Couldn't bind to switch events."); } @@ -67,20 +76,45 @@ public: } void Listen() { - if(_running) - return; + // All we do is sit here and block the run loop thread so it doesn't return + // it seems that if you want to keep your module running you can't return from the run loop + + char_msg msg; + while(true) { + // Listen for term message + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Entered run loop, waiting for term message\n"); + _term_rep.recv(&msg); + if(*msg.char_data() == MODULE_TERM_REQ_MESSAGE) { + // Ack term message + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term message, sending ack and leaving run loop\n"); - _publisher.reset(new ZmqEventPublisher()); - _running = true; + *msg.char_data() = MODULE_TERM_ACK_MESSAGE; + _term_rep.send(msg); - while(_running) { - switch_yield(100000); + break; + } + } + } + + void Shutdown() { + // Send term message + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown requested, sending term message to runloop\n"); + char_msg msg(MODULE_TERM_REQ_MESSAGE); + _term_req.send(msg); + + while(true) { + // Wait for the term ack message + _term_req.recv(&msg); + if(*msg.char_data() == MODULE_TERM_ACK_MESSAGE) { + // Continue shutdown + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term ack message, continuing shutdown\n"); + break; + } } } ~ZmqModule() { // Unsubscribe from the switch events - _running = false; switch_event_unbind(&_node); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n"); } @@ -89,9 +123,8 @@ private: // Dispatches events to the publisher static void event_handler(switch_event_t *event) { try { - ZmqModule *module = (ZmqModule*)event->bind_user_data; - if(module->_publisher.get()) - module->_publisher->PublishEvent(event); + ZmqEventPublisher *publisher = static_cast(event->bind_user_data); + publisher->PublishEvent(event); } catch(std::exception ex) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what()); } catch(...) { // Exceptions must not propogate to C caller @@ -100,8 +133,12 @@ private: } switch_event_node_t *_node; - std::auto_ptr _publisher; - bool _running; + + zmq::context_t _context; + zmq::socket_t _term_rep; + zmq::socket_t _term_req; + + ZmqEventPublisher _publisher; }; //*****************************// @@ -140,10 +177,15 @@ SWITCH_MODULE_RUNTIME_FUNCTION(runtime) { SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) { try { + // Tell the module to shutdown + module->Shutdown(); + // Free the module object module.reset(); + } catch(std::exception &ex) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module: %s\n", ex.what()); } catch(...) { // Exceptions must not propogate to C caller - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error shutting down module\n"); } } diff --git a/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.h b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.h new file mode 100644 index 0000000000..a60c720d74 --- /dev/null +++ b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.h @@ -0,0 +1,20 @@ +#ifndef MOD_EVENT_ZMQ_H +#define MOD_EVENT_ZMQ_H + +namespace mod_event_zmq { +static const char MODULE_TERM_REQ_MESSAGE = 1; +static const char MODULE_TERM_ACK_MESSAGE = 2; + +static const char *TERM_URI = "inproc://mod_event_zmq_term"; + +SWITCH_MODULE_LOAD_FUNCTION(load); +SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown); +SWITCH_MODULE_RUNTIME_FUNCTION(runtime); + +extern "C" { +SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime); +}; + +} + +#endif // MOD_EVENT_ZMQ_H