Fix FS-3904 use a message to shutdown the module

For some reason the running flag being set on the thread initiating the
shutdown did not propogate to the run loop thread. I couldn't find any
memory barrier functions in APR or the FS API so I am using 0MQ to send
a termination request message to the run loop thread.
This commit is contained in:
Josh Perry 2012-02-22 18:19:07 -07:00 committed by Jeff Lenk
parent 24afb8aaaf
commit 13ac8baef2
2 changed files with 92 additions and 30 deletions

View File

@ -4,24 +4,17 @@
#include <stdexcept> #include <stdexcept>
#include <memory> #include <memory>
#include "mod_event_zmq.h"
namespace mod_event_zmq { namespace mod_event_zmq {
SWITCH_MODULE_LOAD_FUNCTION(load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown);
SWITCH_MODULE_RUNTIME_FUNCTION(runtime);
extern "C" {
SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime);
};
// Handles publishing events out to clients // Handles publishing events out to clients
class ZmqEventPublisher { class ZmqEventPublisher {
public: public:
ZmqEventPublisher() : ZmqEventPublisher(zmq::context_t &context) :
context(1), _publisher(context, ZMQ_PUB)
event_publisher(context, ZMQ_PUB)
{ {
event_publisher.bind("tcp://*:5556"); _publisher.bind("tcp://*:5556");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n");
} }
@ -35,7 +28,7 @@ public:
zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL); zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL);
// Send the message // Send the message
event_publisher.send(msg); _publisher.send(msg);
} }
private: private:
@ -43,18 +36,34 @@ private:
free (data); free (data);
} }
zmq::context_t context; zmq::socket_t _publisher;
zmq::socket_t event_publisher; };
class char_msg : public zmq::message_t {
public:
char_msg() : zmq::message_t(sizeof(char)) { }
char_msg(char data) : zmq::message_t(sizeof(char)) {
*char_data() = data;
}
char* char_data() {
return static_cast<char*>(this->data());
}
}; };
// Handles global inititalization and teardown of the module // Handles global inititalization and teardown of the module
class ZmqModule { class ZmqModule {
public: public:
ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) : ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) :
_running(false) { _context(1), _term_rep(_context, ZMQ_REP), _term_req(_context, ZMQ_REQ), _publisher(_context) {
// Set up the term messaging connection
_term_rep.bind(TERM_URI);
_term_req.connect(TERM_URI);
// Subscribe to all switch events of any subclass // Subscribe to all switch events of any subclass
// Store a pointer to ourself in the user data // Store a pointer to ourself in the user data
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node) if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, static_cast<void*>(&_publisher), &_node)
!= SWITCH_STATUS_SUCCESS) { != SWITCH_STATUS_SUCCESS) {
throw std::runtime_error("Couldn't bind to switch events."); throw std::runtime_error("Couldn't bind to switch events.");
} }
@ -67,20 +76,45 @@ public:
} }
void Listen() { void Listen() {
if(_running) // All we do is sit here and block the run loop thread so it doesn't return
return; // it seems that if you want to keep your module running you can't return from the run loop
char_msg msg;
while(true) {
// Listen for term message
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Entered run loop, waiting for term message\n");
_term_rep.recv(&msg);
if(*msg.char_data() == MODULE_TERM_REQ_MESSAGE) {
// Ack term message
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term message, sending ack and leaving run loop\n");
_publisher.reset(new ZmqEventPublisher()); *msg.char_data() = MODULE_TERM_ACK_MESSAGE;
_running = true; _term_rep.send(msg);
while(_running) { break;
switch_yield(100000); }
}
}
void Shutdown() {
// Send term message
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown requested, sending term message to runloop\n");
char_msg msg(MODULE_TERM_REQ_MESSAGE);
_term_req.send(msg);
while(true) {
// Wait for the term ack message
_term_req.recv(&msg);
if(*msg.char_data() == MODULE_TERM_ACK_MESSAGE) {
// Continue shutdown
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term ack message, continuing shutdown\n");
break;
}
} }
} }
~ZmqModule() { ~ZmqModule() {
// Unsubscribe from the switch events // Unsubscribe from the switch events
_running = false;
switch_event_unbind(&_node); switch_event_unbind(&_node);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n");
} }
@ -89,9 +123,8 @@ private:
// Dispatches events to the publisher // Dispatches events to the publisher
static void event_handler(switch_event_t *event) { static void event_handler(switch_event_t *event) {
try { try {
ZmqModule *module = (ZmqModule*)event->bind_user_data; ZmqEventPublisher *publisher = static_cast<ZmqEventPublisher*>(event->bind_user_data);
if(module->_publisher.get()) publisher->PublishEvent(event);
module->_publisher->PublishEvent(event);
} catch(std::exception ex) { } catch(std::exception ex) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what());
} catch(...) { // Exceptions must not propogate to C caller } catch(...) { // Exceptions must not propogate to C caller
@ -100,8 +133,12 @@ private:
} }
switch_event_node_t *_node; switch_event_node_t *_node;
std::auto_ptr<ZmqEventPublisher> _publisher;
bool _running; zmq::context_t _context;
zmq::socket_t _term_rep;
zmq::socket_t _term_req;
ZmqEventPublisher _publisher;
}; };
//*****************************// //*****************************//
@ -140,10 +177,15 @@ SWITCH_MODULE_RUNTIME_FUNCTION(runtime) {
SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) { SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) {
try { try {
// Tell the module to shutdown
module->Shutdown();
// Free the module object // Free the module object
module.reset(); module.reset();
} catch(std::exception &ex) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module: %s\n", ex.what());
} catch(...) { // Exceptions must not propogate to C caller } catch(...) { // Exceptions must not propogate to C caller
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error shutting down module\n");
} }
} }

View File

@ -0,0 +1,20 @@
#ifndef MOD_EVENT_ZMQ_H
#define MOD_EVENT_ZMQ_H
namespace mod_event_zmq {
static const char MODULE_TERM_REQ_MESSAGE = 1;
static const char MODULE_TERM_ACK_MESSAGE = 2;
static const char *TERM_URI = "inproc://mod_event_zmq_term";
SWITCH_MODULE_LOAD_FUNCTION(load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown);
SWITCH_MODULE_RUNTIME_FUNCTION(runtime);
extern "C" {
SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime);
};
}
#endif // MOD_EVENT_ZMQ_H