refactoring to use function call based boost instead of sockets

git-svn-id: http://svn.openzap.org/svn/openzap/branches/sangoma_boost@857 a93c3328-9c30-0410-af19-c9cd2b2d52af
This commit is contained in:
Moises Silva 2009-11-13 00:41:50 +00:00
parent 66adb741e7
commit 890cba5ba1
6 changed files with 288 additions and 83 deletions

View File

@ -683,6 +683,7 @@ OZ_DECLARE(zap_status_t) zap_unload_modules(void);
OZ_DECLARE(zap_status_t) zap_configure_span(const char *type, zap_span_t *span, zio_signal_cb_t sig_cb, ...);
OZ_DECLARE(zap_status_t) zap_span_start(zap_span_t *span);
OZ_DECLARE(zap_status_t) zap_span_stop(zap_span_t *span);
OZ_DECLARE(char *) zap_build_dso_path(const char *name, char *path, zap_size_t len);
OZ_DECLARE(int) zap_load_module(const char *name);
OZ_DECLARE(int) zap_load_module_assume(const char *name);
OZ_DECLARE(zap_status_t) zap_span_find_by_name(const char *name, zap_span_t **span);

View File

@ -38,6 +38,10 @@
#include <sys/select.h>
#endif
/* Boost signaling modules global hash and its mutex */
zap_mutex_t *g_boost_modules_mutex = NULL;
zap_hash_t *g_boost_modules_hash = NULL;
#define MAX_TRUNK_GROUPS 64
static time_t congestion_timeouts[MAX_TRUNK_GROUPS];
@ -1118,6 +1122,90 @@ static void *zap_sangoma_events_run(zap_thread_t *me, void *obj)
return NULL;
}
static int zap_boost_connection_open(zap_span_t *span)
{
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
if (sangoma_boost_data->sigmod) {
return 0;
}
sangoma_boost_data->pcon = sangoma_boost_data->mcon;
if (sangomabc_connection_open(&sangoma_boost_data->mcon,
sangoma_boost_data->mcon.cfg.local_ip,
sangoma_boost_data->mcon.cfg.local_port,
sangoma_boost_data->mcon.cfg.remote_ip,
sangoma_boost_data->mcon.cfg.remote_port) < 0) {
zap_log(ZAP_LOG_DEBUG, "Error: Opening MCON Socket [%d] %s\n", sangoma_boost_data->mcon.socket, strerror(errno));
return -1;
}
if (sangomabc_connection_open(&sangoma_boost_data->pcon,
sangoma_boost_data->pcon.cfg.local_ip,
++sangoma_boost_data->pcon.cfg.local_port,
sangoma_boost_data->pcon.cfg.remote_ip,
++sangoma_boost_data->pcon.cfg.remote_port) < 0) {
zap_log(ZAP_LOG_DEBUG, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno));
return -1;
}
return 0;
}
/*!
\brief wait for a boost event
\return -1 on error, 0 on timeout, 1 when there are events
*/
static int zap_boost_wait_event(zap_span_t *span, int ms)
{
struct timeval tv = { 0, ms * 1000 };
int max, activity;
sangomabc_connection_t *mcon, *pcon;
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon;
FD_ZERO(&sangoma_boost_data->rfds);
FD_ZERO(&sangoma_boost_data->efds);
FD_SET(mcon->socket, &sangoma_boost_data->rfds);
FD_SET(mcon->socket, &sangoma_boost_data->efds);
FD_SET(pcon->socket, &sangoma_boost_data->rfds);
FD_SET(pcon->socket, &sangoma_boost_data->efds);
sangoma_boost_data->iteration = 0;
max = ((pcon->socket > mcon->socket) ? pcon->socket : mcon->socket) + 1;
if ((activity = select(max, &sangoma_boost_data->rfds, NULL, &sangoma_boost_data->efds, &tv)) < 0) {
return -1;
}
if (FD_ISSET(pcon->socket, &sangoma_boost_data->efds) || FD_ISSET(mcon->socket, &sangoma_boost_data->efds)) {
return -1;
}
return 1;
}
static sangomabc_event_t *zap_boost_read_event(zap_span_t *span)
{
sangomabc_event_t *event = NULL;
sangomabc_connection_t *mcon, *pcon;
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon;
if (sangoma_boost_data->sigmod || FD_ISSET(pcon->socket, &sangoma_boost_data->rfds)) {
event = sangomabc_connection_readp(pcon, sangoma_boost_data->iteration);
}
/* if there is no event and this is not a sigmod-driven span it's time to try the other connection for events */
if (!event && !sangoma_boost_data->sigmod && FD_ISSET(mcon->socket, &sangoma_boost_data->rfds)) {
event = sangomabc_connection_readp(mcon, sangoma_boost_data->iteration);
}
return event;
}
/**
* \brief Main thread function for sangoma boost span (monitor)
* \param me Current thread
@ -1126,33 +1214,20 @@ static void *zap_sangoma_events_run(zap_thread_t *me, void *obj)
static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
{
zap_span_t *span = (zap_span_t *) obj;
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
sangomabc_connection_t *mcon, *pcon;
uint32_t ms = 10; //, too_long = 20000;
uint32_t ms = 10;
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
sangoma_boost_data->pcon = sangoma_boost_data->mcon;
if (zap_boost_connection_open(span) < 0) {
goto end;
}
if (sangomabc_connection_open(&sangoma_boost_data->mcon,
sangoma_boost_data->mcon.cfg.local_ip,
sangoma_boost_data->mcon.cfg.local_port,
sangoma_boost_data->mcon.cfg.remote_ip,
sangoma_boost_data->mcon.cfg.remote_port) < 0) {
zap_log(ZAP_LOG_DEBUG, "Error: Opening MCON Socket [%d] %s\n", sangoma_boost_data->mcon.socket, strerror(errno));
goto end;
}
if (sangomabc_connection_open(&sangoma_boost_data->pcon,
sangoma_boost_data->pcon.cfg.local_ip,
++sangoma_boost_data->pcon.cfg.local_port,
sangoma_boost_data->pcon.cfg.remote_ip,
++sangoma_boost_data->pcon.cfg.remote_port) < 0) {
zap_log(ZAP_LOG_DEBUG, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno));
goto end;
}
mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon;
pcon = &sangoma_boost_data->pcon;
/* sigmod overrides socket functionality if not null */
mcon->sigmod = sangoma_boost_data->sigmod;
pcon->sigmod = sangoma_boost_data->sigmod;
init_outgoing_array();
@ -1165,10 +1240,8 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
zap_set_flag(mcon, MSU_FLAG_DOWN);
while (zap_test_flag(sangoma_boost_data, ZAP_SANGOMA_BOOST_RUNNING)) {
fd_set rfds, efds;
struct timeval tv = { 0, ms * 1000 };
int max, activity, i = 0;
sangomabc_event_t *event = NULL;
int activity = 0;
if (!zap_running()) {
sangomabc_exec_commandp(pcon,
@ -1181,37 +1254,15 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
break;
}
FD_ZERO(&rfds);
FD_ZERO(&efds);
FD_SET(mcon->socket, &rfds);
FD_SET(mcon->socket, &efds);
FD_SET(pcon->socket, &rfds);
FD_SET(pcon->socket, &efds);
max = ((pcon->socket > mcon->socket) ? pcon->socket : mcon->socket) + 1;
if ((activity = select(max, &rfds, NULL, &efds, &tv)) < 0) {
if ((activity = zap_boost_wait_event(span, ms)) < 0) {
goto error;
}
if (activity) {
if (FD_ISSET(pcon->socket, &efds) || FD_ISSET(mcon->socket, &efds)) {
goto error;
}
if (FD_ISSET(pcon->socket, &rfds)) {
while ((event = sangomabc_connection_readp(pcon, i))) {
parse_sangoma_event(span, pcon, (sangomabc_short_event_t*)event);
i++;
}
}
i=0;
if (FD_ISSET(mcon->socket, &rfds)) {
if ((event = sangomabc_connection_read(mcon, i))) {
parse_sangoma_event(span, mcon, (sangomabc_short_event_t*)event);
i++;
}
while ((event = zap_boost_read_event(span))) {
parse_sangoma_event(span, pcon, (sangomabc_short_event_t*)event);
sangoma_boost_data->iteration++;
}
}
@ -1246,7 +1297,7 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
goto end;
error:
zap_log(ZAP_LOG_CRIT, "Socket Error!\n");
zap_log(ZAP_LOG_CRIT, "Boost event processing Error!\n");
end:
@ -1255,7 +1306,7 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
zap_clear_flag(sangoma_boost_data, ZAP_SANGOMA_BOOST_RUNNING);
zap_log(ZAP_LOG_DEBUG, "SANGOMA_BOOST thread ended.\n");
zap_log(ZAP_LOG_DEBUG, "Sangoma Boost thread ended.\n");
return NULL;
}
@ -1266,9 +1317,35 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
*/
static ZIO_SIG_LOAD_FUNCTION(zap_sangoma_boost_init)
{
g_boost_modules_hash = create_hashtable(10, zap_hash_hashfromstring, zap_hash_equalkeys);
if (!g_boost_modules_hash) {
return ZAP_FAIL;
}
zap_mutex_create(&request_mutex);
zap_mutex_create(&signal_mutex);
zap_mutex_create(&g_boost_modules_mutex);
return ZAP_SUCCESS;
}
static ZIO_SIG_UNLOAD_FUNCTION(zap_sangoma_boost_destroy)
{
zap_hash_iterator_t *i = NULL;
boost_sigmod_interface_t *sigmod = NULL;
const void *key = NULL;
void *val = NULL;
for (i = hashtable_first(g_boost_modules_hash); i; i = hashtable_next(i)) {
hashtable_this(i, &key, NULL, &val);
if (key && val) {
sigmod = val;
zap_dso_destroy(sigmod->pvt);
}
}
hashtable_destroy(g_boost_modules_hash);
zap_mutex_destroy(&request_mutex);
zap_mutex_destroy(&signal_mutex);
zap_mutex_destroy(&g_boost_modules_mutex);
return ZAP_SUCCESS;
}
@ -1291,6 +1368,15 @@ static zap_status_t zap_sangoma_boost_start(zap_span_t *span)
return err;
}
static zap_status_t zap_sangoma_boost_stop(zap_span_t *span)
{
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
if (sangoma_boost_data->sigmod) {
return sangoma_boost_data->sigmod->stop_span(span);
}
return ZAP_SUCCESS;
}
static zap_state_map_t boost_state_map = {
{
{
@ -1390,6 +1476,19 @@ static zap_state_map_t boost_state_map = {
}
};
static BOOST_WRITE_MSG_FUNCTION(zap_boost_write_msg)
{
/* TODO: write to msg queue and kick the pthread condition */
return ZAP_SUCCESS;
}
static BOOST_SIG_STATUS_CB_FUNCTION(zap_boost_sig_status_change)
{
/* TODO: Notify the upper layer of the signaling status change (via span signaling callback and a new msg type?) */
return;
}
/**
* \brief Initialises an sangoma boost span from configuration variables
* \param span Span to configure
@ -1399,14 +1498,41 @@ static zap_state_map_t boost_state_map = {
*/
static ZIO_SIG_CONFIGURE_FUNCTION(zap_sangoma_boost_configure_span)
{
#define FAIL_CONFIG_RETURN(retstatus) \
if (sangoma_boost_data) \
zap_safe_free(sangoma_boost_data); \
if (err) \
zap_safe_free(err) \
if (hash_locked) \
zap_mutex_unlock(g_boost_modules_mutex); \
if (lib) \
zap_dso_destroy(lib); \
va_end(conflist); \
return retstatus;
boost_sigmod_interface_t *sigmod_iface = NULL;
zap_sangoma_boost_data_t *sangoma_boost_data = NULL;
const char *local_ip = "127.0.0.65", *remote_ip = "127.0.0.66";
const char *sigmod = NULL;
int local_port = 53000, remote_port = 53000;
char *var, *val;
int *intval;
char *var = NULL, *val = NULL;
int *intval = NULL;
int hash_locked = 0;
zap_dso_lib_t lib = NULL;
char path[255] = "";
char *err = NULL;
va_list conflist;
/* we need to copy the list before moving with va_arg in case this configuration should be handled by a sigmod */
va_copy(conflist, ap); /* WARNING: must be freed before returning */
while((var = va_arg(ap, char *))) {
if (!strcasecmp(var, "local_ip")) {
if (!strcasecmp(var, "sigmod")) {
if (!(val = va_arg(ap, char *))) {
break;
}
sigmod = val;
} else if (!strcasecmp(var, "local_ip")) {
if (!(val = va_arg(ap, char *))) {
break;
}
@ -1428,26 +1554,63 @@ static ZIO_SIG_CONFIGURE_FUNCTION(zap_sangoma_boost_configure_span)
remote_port = *intval;
} else {
snprintf(span->last_error, sizeof(span->last_error), "Unknown parameter [%s]", var);
return ZAP_FAIL;
FAIL_CONFIG_RETURN(ZAP_FAIL);
}
}
if (!local_ip && local_port && remote_ip && remote_port && sig_cb) {
zap_set_string(span->last_error, "missing params");
return ZAP_FAIL;
if (!sigmod) {
if (!local_ip && local_port && remote_ip && remote_port && sig_cb) {
zap_set_string(span->last_error, "missing Sangoma boost IP parameters");
FAIL_CONFIG_RETURN(ZAP_FAIL);
}
}
sangoma_boost_data = malloc(sizeof(*sangoma_boost_data));
assert(sangoma_boost_data);
if (!sangoma_boost_data) {
FAIL_CONFIG_RETURN(ZAP_FAIL);
}
memset(sangoma_boost_data, 0, sizeof(*sangoma_boost_data));
zap_set_string(sangoma_boost_data->mcon.cfg.local_ip, local_ip);
sangoma_boost_data->mcon.cfg.local_port = local_port;
zap_set_string(sangoma_boost_data->mcon.cfg.remote_ip, remote_ip);
sangoma_boost_data->mcon.cfg.remote_port = remote_port;
/* WARNING: be sure to release this mutex on errors inside this if() */
zap_mutex_lock(g_boost_modules_mutex);
hash_locked = 1;
if (sigmod && !(sigmod_iface = hashtable_search(g_boost_modules_hash, (void *)sigmod))) {
zap_build_dso_path(sigmod, path, sizeof(path));
lib = zap_dso_open(path, &err);
if (!lib) {
zap_log(ZAP_LOG_ERROR, "Error loading Sangoma boost signaling module '%s': %s\n", path, err);
snprintf(span->last_error, sizeof(span->last_error), "Failed to load sangoma boost signaling module %s", path);
FAIL_CONFIG_RETURN(ZAP_FAIL);
}
if (!(sigmod_iface = (boost_sigmod_interface_t *)zap_dso_func_sym(lib, "boost_sigmod_interface", &err))) {
zap_log(ZAP_LOG_ERROR, "Failed to read Sangoma boost signaling module interface '%s': %s\n", path, err);
snprintf(span->last_error, sizeof(span->last_error), "Failed to read Sangoma boost signaling module interface '%s': %s", path, err);
FAIL_CONFIG_RETURN(ZAP_FAIL);
}
sigmod_iface->pvt = lib;
sigmod_iface->set_sig_status_cb(zap_boost_sig_status_change);
sigmod_iface->set_write_msg_cb(zap_boost_write_msg);
hashtable_insert(g_boost_modules_hash, (void *)sigmod_iface->name, sigmod_iface, HASHTABLE_FLAG_NONE);
}
zap_mutex_unlock(g_boost_modules_mutex);
hash_locked = 0;
if (sigmod_iface) {
zap_log(ZAP_LOG_NOTICE, "Span %s will use Sangoma Boost Signaling Module %s\n", span->name, sigmod_iface->name);
sangoma_boost_data->sigmod = sigmod_iface;
sigmod_iface->configure_span(span, conflist);
} else {
zap_set_string(sangoma_boost_data->mcon.cfg.local_ip, local_ip);
sangoma_boost_data->mcon.cfg.local_port = local_port;
zap_set_string(sangoma_boost_data->mcon.cfg.remote_ip, remote_ip);
sangoma_boost_data->mcon.cfg.remote_port = remote_port;
}
sangoma_boost_data->signal_cb = sig_cb;
span->start = zap_sangoma_boost_start;
span->stop = zap_sangoma_boost_stop;
span->signal_data = sangoma_boost_data;
span->signal_type = ZAP_SIGTYPE_SANGOMABOOST;
span->outgoing_call = sangoma_boost_outgoing_call;
@ -1455,6 +1618,8 @@ static ZIO_SIG_CONFIGURE_FUNCTION(zap_sangoma_boost_configure_span)
span->state_map = &boost_state_map;
zap_set_flag_locked(span, ZAP_SPAN_SUSPENDED);
va_end(conflist);
return ZAP_SUCCESS;
}
@ -1467,7 +1632,7 @@ zap_module_t zap_module = {
NULL,
zap_sangoma_boost_init,
zap_sangoma_boost_configure_span,
NULL
zap_sangoma_boost_destroy
};
/* For Emacs:

View File

@ -113,6 +113,11 @@ static int create_conn_socket(sangomabc_connection_t *mcon, char *local_ip, int
char buf[512], local_buf[512];
int err = 0, local_err = 0;
if (mcon->sigmod) {
zap_log(ZAP_LOG_WARNING, "I should not be called on a sigmod-managed connection!\n");
return 0;
}
memset(&mcon->remote_hp, 0, sizeof(mcon->remote_hp));
memset(&mcon->local_hp, 0, sizeof(mcon->local_hp));
#ifdef HAVE_NETINET_SCTP_H
@ -175,6 +180,10 @@ static int create_conn_socket(sangomabc_connection_t *mcon, char *local_ip, int
int sangomabc_connection_close(sangomabc_connection_t *mcon)
{
if (mcon->sigmod) {
zap_log(ZAP_LOG_WARNING, "I should not be called on a sigmod-managed connection!\n");
return 0;
}
if (mcon->socket > -1) {
close(mcon->socket);
}
@ -192,6 +201,10 @@ int sangomabc_connection_close(sangomabc_connection_t *mcon)
int sangomabc_connection_open(sangomabc_connection_t *mcon, char *local_ip, int local_port, char *ip, int port)
{
if (mcon->sigmod) {
zap_log(ZAP_LOG_WARNING, "I should not be called on a sigmod-managed connection!\n");
return 0;
}
create_conn_socket(mcon, local_ip, local_port, ip, port);
return mcon->socket;
}
@ -261,8 +274,14 @@ sangomabc_event_t *__sangomabc_connection_read(sangomabc_connection_t *mcon, int
int bytes = 0;
int msg_ok = 0;
bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT,
(struct sockaddr *) &mcon->local_addr, &fromlen);
if (mcon->sigmod) {
/* TODO: implement me */
zap_log(ZAP_LOG_ERROR, "__sangomabc_connection_read not implemented yet for signaling modules\n");
return NULL;
} else {
bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT,
(struct sockaddr *) &mcon->local_addr, &fromlen);
}
if (bytes <= 0) {
return NULL;

View File

@ -135,15 +135,26 @@ typedef zap_status_t (*boost_stop_span_func_t) BOOST_START_SPAN_ARGS;
\brief The boost signaling module interface
*/
typedef struct boost_sigmod_interface_s {
/*! \brief Module name */
const char *name;
/*! \brief write boost message function */
boost_write_msg_func_t write_msg;
/*! \brief set the user write boost message function */
boost_set_write_msg_cb_func_t set_write_msg_cb;
/*! \brief set the user signaling status function */
boost_set_sig_status_cb_func_t set_sig_status_cb;
/*! \brief get channel signaling status */
boost_get_sig_status_func_t get_sig_status;
/*! \brief set channel signaling status */
boost_set_sig_status_func_t set_sig_status;
/*! \brief configure span signaling */
boost_configure_span_func_t configure_span;
/*! \brief start openzap span */
boost_start_span_func_t start_span;
boost_stop_span_func_t stop_stpan;
/*! \brief stop openzap span */
boost_stop_span_func_t stop_span;
/*! \brief private pointer for the interface user */
void *pvt;
} boost_sigmod_interface_t;
#endif

View File

@ -44,8 +44,12 @@ typedef enum {
typedef struct zap_sangoma_boost_data {
sangomabc_connection_t mcon;
sangomabc_connection_t pcon;
fd_set rfds;
fd_set efds;
int iteration;
zio_signal_cb_t signal_cb;
uint32_t flags;
boost_sigmod_interface_t *sigmod;
} zap_sangoma_boost_data_t;
#endif

View File

@ -2528,14 +2528,8 @@ static zap_status_t process_module_config(zap_io_interface_t *zio)
return ZAP_SUCCESS;
}
OZ_DECLARE(int) zap_load_module(const char *name)
OZ_DECLARE(char *) zap_build_dso_path(const char *name, char *path, zap_size_t len)
{
zap_dso_lib_t lib;
int count = 0, x = 0;
char path[128] = "";
char *err;
zap_module_t *mod;
#ifdef WIN32
const char *ext = ".dll";
//const char *EXT = ".DLL";
@ -2547,13 +2541,24 @@ OZ_DECLARE(int) zap_load_module(const char *name)
const char *ext = ".so";
//const char *EXT = ".SO";
#endif
if (*name == *ZAP_PATH_SEPARATOR) {
snprintf(path, sizeof(path), "%s%s", name, ext);
snprintf(path, len, "%s%s", name, ext);
} else {
snprintf(path, sizeof(path), "%s%s%s%s", ZAP_MOD_DIR, ZAP_PATH_SEPARATOR, name, ext);
snprintf(path, len, "%s%s%s%s", ZAP_MOD_DIR, ZAP_PATH_SEPARATOR, name, ext);
}
return path;
}
OZ_DECLARE(int) zap_load_module(const char *name)
{
zap_dso_lib_t lib;
int count = 0, x = 0;
char path[128] = "";
char *err;
zap_module_t *mod;
zap_build_dso_path(name, path, sizeof(path));
if (!(lib = zap_dso_open(path, &err))) {
zap_log(ZAP_LOG_ERROR, "Error loading %s [%s]\n", path, err);
zap_safe_free(err);