From 19a706877b4494d9bcca2d5dfed33dc7ef97a4d7 Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Fri, 13 Nov 2009 20:48:06 +0000 Subject: [PATCH] added openzap queue stuff, still missing implementation git-svn-id: http://svn.openzap.org/svn/openzap/branches/sangoma_boost@858 a93c3328-9c30-0410-af19-c9cd2b2d52af --- libs/freetdm/src/include/openzap.h | 17 +++++- .../ozmod_sangoma_boost/ozmod_sangoma_boost.c | 56 +++++++++++++------ .../sangoma_boost_client.c | 34 +++++++++-- .../sangoma_boost_client.h | 7 +++ .../sangoma_boost_sigmod.h | 3 +- .../ozmod_sangoma_boost/zap_sangoma_boost.h | 1 + 6 files changed, 94 insertions(+), 24 deletions(-) diff --git a/libs/freetdm/src/include/openzap.h b/libs/freetdm/src/include/openzap.h index 9e90209440..aa05136378 100644 --- a/libs/freetdm/src/include/openzap.h +++ b/libs/freetdm/src/include/openzap.h @@ -577,7 +577,6 @@ struct zap_span { struct zap_span *next; }; - OZ_DECLARE_DATA extern zap_logger_t zap_log; struct zap_io_interface { @@ -598,6 +597,22 @@ struct zap_io_interface { zio_api_t api; }; +typedef void* zap_queue_t; + +/*! brief create a new queue */ +OZ_DECLARE(zap_queue_t) zap_queue_create(void); + +/*! Enqueue an object */ +OZ_DECLARE(zap_status_t) zap_queue_enqueue(zap_queue_t queue, void *obj); + +/*! dequeue an object from the queue */ +OZ_DECLARE(void *) zap_queue_dequeue(zap_queue_t queue); + +/*! wait ms milliseconds for a queue to have available objects, -1 to wait forever */ +OZ_DECLARE(zap_status_t) zap_queue_wait(zap_queue_t queue, int ms); + +/*! destroy the queue */ +OZ_DECLARE(void) zap_queue_destroy(zap_queue_t *queue); OZ_DECLARE(zap_size_t) zap_fsk_modulator_generate_bit(zap_fsk_modulator_t *fsk_trans, int8_t bit, int16_t *buf, zap_size_t buflen); OZ_DECLARE(int32_t) zap_fsk_modulator_generate_carrier_bits(zap_fsk_modulator_t *fsk_trans, uint32_t bits); diff --git a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c index 349b5133ee..688397d870 100644 --- a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c +++ b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c @@ -1122,11 +1122,11 @@ static void *zap_sangoma_events_run(zap_thread_t *me, void *obj) return NULL; } -static int zap_boost_connection_open(zap_span_t *span) +static zap_status_t zap_boost_connection_open(zap_span_t *span) { zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; if (sangoma_boost_data->sigmod) { - return 0; + return sangoma_boost_data->sigmod->start_span(span); } sangoma_boost_data->pcon = sangoma_boost_data->mcon; @@ -1136,8 +1136,8 @@ static int zap_boost_connection_open(zap_span_t *span) sangoma_boost_data->mcon.cfg.local_port, sangoma_boost_data->mcon.cfg.remote_ip, sangoma_boost_data->mcon.cfg.remote_port) < 0) { - zap_log(ZAP_LOG_DEBUG, "Error: Opening MCON Socket [%d] %s\n", sangoma_boost_data->mcon.socket, strerror(errno)); - return -1; + zap_log(ZAP_LOG_ERROR, "Error: Opening MCON Socket [%d] %s\n", sangoma_boost_data->mcon.socket, strerror(errno)); + return ZAP_FAIL; } if (sangomabc_connection_open(&sangoma_boost_data->pcon, @@ -1145,10 +1145,10 @@ static int zap_boost_connection_open(zap_span_t *span) ++sangoma_boost_data->pcon.cfg.local_port, sangoma_boost_data->pcon.cfg.remote_ip, ++sangoma_boost_data->pcon.cfg.remote_port) < 0) { - zap_log(ZAP_LOG_DEBUG, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno)); - return -1; + zap_log(ZAP_LOG_ERROR, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno)); + return ZAP_FAIL; } - return 0; + return ZAP_SUCCESS; } /*! @@ -1218,16 +1218,20 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj) uint32_t ms = 10; zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; - if (zap_boost_connection_open(span) < 0) { - goto end; - } - mcon = &sangoma_boost_data->mcon; pcon = &sangoma_boost_data->pcon; - /* sigmod overrides socket functionality if not null */ - mcon->sigmod = sangoma_boost_data->sigmod; - pcon->sigmod = sangoma_boost_data->sigmod; + if (sangoma_boost_data->sigmod) { + /* sigmod overrides socket functionality if not null */ + mcon->sigmod = sangoma_boost_data->sigmod; + pcon->sigmod = sangoma_boost_data->sigmod; + mcon->span = span; + pcon->span = span; + } + + if (zap_boost_connection_open(span) != ZAP_SUCCESS) { + goto end; + } init_outgoing_array(); @@ -1370,9 +1374,15 @@ static zap_status_t zap_sangoma_boost_start(zap_span_t *span) static zap_status_t zap_sangoma_boost_stop(zap_span_t *span) { + zap_status_t status = ZAP_SUCCESS; zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; if (sangoma_boost_data->sigmod) { - return sangoma_boost_data->sigmod->stop_span(span); + /* FIXME: we should make sure the span thread is stopped (use pthread_kill or openzap thread kill function) */ + /* I think stopping the span before destroying the queue makes sense + otherwise may be boost events would still arrive when the queue is already destroyed! */ + status = sangoma_boost_data->sigmod->stop_span(span); + zap_queue_destroy(sangoma_boost_data->boost_queue); + return status; } return ZAP_SUCCESS; } @@ -1478,7 +1488,14 @@ static zap_state_map_t boost_state_map = { static BOOST_WRITE_MSG_FUNCTION(zap_boost_write_msg) { - /* TODO: write to msg queue and kick the pthread condition */ + zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; + sangomabc_queue_element_t *element = malloc(sizeof(*element)); + if (!element) { + return ZAP_FAIL; + } + memcpy(&element->boostmsg, msg, msglen); + element->size = msglen; + zap_queue_enqueue(sangoma_boost_data->boost_queue, element); return ZAP_SUCCESS; } @@ -1594,11 +1611,18 @@ static ZIO_SIG_CONFIGURE_FUNCTION(zap_sangoma_boost_configure_span) sigmod_iface->set_sig_status_cb(zap_boost_sig_status_change); sigmod_iface->set_write_msg_cb(zap_boost_write_msg); hashtable_insert(g_boost_modules_hash, (void *)sigmod_iface->name, sigmod_iface, HASHTABLE_FLAG_NONE); + lib = NULL; /* destroying the lib will be done when going down and NOT on FAIL_CONFIG_RETURN */ } zap_mutex_unlock(g_boost_modules_mutex); hash_locked = 0; if (sigmod_iface) { + /* try to create the boost queue */ + sangoma_boost_data->boost_queue = zap_queue_create(); + if (!sangoma_boost_data->boost_queue) { + zap_log(ZAP_LOG_ERROR, "Span %s could not create its boost queue!\n", span->name); + FAIL_CONFIG_RETURN(ZAP_FAIL); + } zap_log(ZAP_LOG_NOTICE, "Span %s will use Sangoma Boost Signaling Module %s\n", span->name, sigmod_iface->name); sangoma_boost_data->sigmod = sigmod_iface; sigmod_iface->configure_span(span, conflist); diff --git a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.c b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.c index ec9ce3eaa7..cf421aa4fa 100644 --- a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.c +++ b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.c @@ -273,11 +273,13 @@ sangomabc_event_t *__sangomabc_connection_read(sangomabc_connection_t *mcon, int unsigned int fromlen = sizeof(struct sockaddr_in); int bytes = 0; int msg_ok = 0; + sangomabc_queue_element_t *e = NULL; if (mcon->sigmod) { - /* TODO: implement me */ - zap_log(ZAP_LOG_ERROR, "__sangomabc_connection_read not implemented yet for signaling modules\n"); - return NULL; + e = zap_queue_dequeue(mcon->boost_queue); + bytes = e->size; + memcpy(&mcon->event, e->boostmsg, bytes); + zap_safe_free(e); } else { bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT, (struct sockaddr *) &mcon->local_addr, &fromlen); @@ -362,8 +364,16 @@ sangomabc_event_t *__sangomabc_connection_readp(sangomabc_connection_t *mcon, in { unsigned int fromlen = sizeof(struct sockaddr_in); int bytes = 0; + sangomabc_queue_element_t *e = NULL; - bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT, (struct sockaddr *) &mcon->local_addr, &fromlen); + if (mcon->sigmod) { + e = zap_queue_dequeue(mcon->boost_queue); + bytes = e->size; + memcpy(&mcon->event, e->boostmsg, bytes); + zap_safe_free(e); + } else { + bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT, (struct sockaddr *) &mcon->local_addr, &fromlen); + } if (bytes <= 0) { return NULL; @@ -434,7 +444,13 @@ int __sangomabc_connection_write(sangomabc_connection_t *mcon, sangomabc_event_t } event->bseqno = mcon->rxseq; event->version = SIGBOOST_VERSION; - err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr)); + + if (mcon->sigmod) { + mcon->sigmod->write_msg(mcon->span, event, event_size); + err = event_size; + } else { + err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr)); + } zap_mutex_unlock(mcon->mutex); @@ -470,7 +486,13 @@ int __sangomabc_connection_writep(sangomabc_connection_t *mcon, sangomabc_event_ zap_mutex_lock(mcon->mutex); event->version = SIGBOOST_VERSION; - err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr)); + if (mcon->sigmod) { + mcon->sigmod->write_msg(mcon->span, event, event_size); + err = event_size; + return -1; + } else { + err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr)); + } zap_mutex_unlock(mcon->mutex); if (err != event_size) { diff --git a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.h b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.h index 9ed2c5f35e..a18d3e7eeb 100644 --- a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.h +++ b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_client.h @@ -108,10 +108,17 @@ struct sangomabc_connection { uint32_t hb_elapsed; /* boost signaling mod interface pointer (if not working in TCP mode) */ boost_sigmod_interface_t *sigmod; + zap_queue_t boost_queue; + zap_span_t *span; }; typedef struct sangomabc_connection sangomabc_connection_t; +typedef struct sangomabc_queue_element { + unsigned char boostmsg[sizeof(sangomabc_event_t)]; + zap_size_t size; +} sangomabc_queue_element_t; + /* disable nagle's algorythm */ static inline void sctp_no_nagle(int socket) { diff --git a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_sigmod.h b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_sigmod.h index 6fd9d071e3..441536c74f 100644 --- a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_sigmod.h +++ b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/sangoma_boost_sigmod.h @@ -48,6 +48,7 @@ typedef void (*boost_sig_status_cb_func_t) BOOST_SIG_STATUS_CB_ARGS; /*! \brief Write a boost msg to a boost endpoint + \param span The openzap span where this msg was generated \param msg The generic message pointer, owned by the caller \param msglen The length of the provided structure pointed by msg \return ZAP_SUCCESS or ZAP_FAIL @@ -57,7 +58,7 @@ typedef void (*boost_sig_status_cb_func_t) BOOST_SIG_STATUS_CB_ARGS; the endpoint receiving the msg will first cast to t_sigboost_short, check the event type, and if needed. */ -#define BOOST_WRITE_MSG_ARGS (void *msg, zap_size_t msglen) +#define BOOST_WRITE_MSG_ARGS (zap_span_t *span, void *msg, zap_size_t msglen) typedef zap_status_t (*boost_write_msg_func_t) BOOST_WRITE_MSG_ARGS; #define BOOST_WRITE_MSG_FUNCTION(name) zap_status_t name BOOST_WRITE_MSG_ARGS diff --git a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/zap_sangoma_boost.h b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/zap_sangoma_boost.h index 8459e0883b..fc2fc953fd 100644 --- a/libs/freetdm/src/ozmod/ozmod_sangoma_boost/zap_sangoma_boost.h +++ b/libs/freetdm/src/ozmod/ozmod_sangoma_boost/zap_sangoma_boost.h @@ -50,6 +50,7 @@ typedef struct zap_sangoma_boost_data { zio_signal_cb_t signal_cb; uint32_t flags; boost_sigmod_interface_t *sigmod; + zap_queue_t boost_queue; } zap_sangoma_boost_data_t; #endif