finished zap queue and zap conditions implementation for Linux

git-svn-id: http://svn.openzap.org/svn/openzap/branches/sangoma_boost@860 a93c3328-9c30-0410-af19-c9cd2b2d52af
This commit is contained in:
Moises Silva 2009-11-14 02:04:38 +00:00
parent c46160b4ff
commit e7a5ecacb1
9 changed files with 274 additions and 28 deletions

View File

@ -579,6 +579,13 @@ struct zap_span {
OZ_DECLARE_DATA extern zap_logger_t zap_log;
typedef enum {
ZAP_CRASH_NEVER = 0,
ZAP_CRASH_ON_ASSERT
} zap_crash_policy_t;
OZ_DECLARE_DATA extern zap_crash_policy_t g_zap_crash_policy;
struct zap_io_interface {
const char *name;
zio_configure_span_t configure_span;
@ -597,22 +604,23 @@ struct zap_io_interface {
zio_api_t api;
};
typedef void* zap_queue_t;
struct zap_queue;
#define zap_queue_t struct zap_queue
/*! brief create a new queue */
OZ_DECLARE(zap_queue_t) zap_queue_create(void);
OZ_DECLARE(zap_status_t) zap_queue_create(zap_queue_t **queue, zap_size_t size);
/*! Enqueue an object */
OZ_DECLARE(zap_status_t) zap_queue_enqueue(zap_queue_t queue, void *obj);
OZ_DECLARE(zap_status_t) zap_queue_enqueue(zap_queue_t *queue, void *obj);
/*! dequeue an object from the queue */
OZ_DECLARE(void *) zap_queue_dequeue(zap_queue_t queue);
OZ_DECLARE(void *) zap_queue_dequeue(zap_queue_t *queue);
/*! wait ms milliseconds for a queue to have available objects, -1 to wait forever */
OZ_DECLARE(zap_status_t) zap_queue_wait(zap_queue_t queue, int ms);
OZ_DECLARE(zap_status_t) zap_queue_wait(zap_queue_t *queue, int ms);
/*! destroy the queue */
OZ_DECLARE(void) zap_queue_destroy(zap_queue_t queue);
OZ_DECLARE(zap_status_t) zap_queue_destroy(zap_queue_t **queue);
OZ_DECLARE(zap_size_t) zap_fsk_modulator_generate_bit(zap_fsk_modulator_t *fsk_trans, int8_t bit, int16_t *buf, zap_size_t buflen);
OZ_DECLARE(int32_t) zap_fsk_modulator_generate_carrier_bits(zap_fsk_modulator_t *fsk_trans, uint32_t bits);
@ -683,6 +691,7 @@ OZ_DECLARE(const char *) zap_channel_get_var(zap_channel_t *zchan, const char *v
OZ_DECLARE(zap_status_t) zap_channel_clear_vars(zap_channel_t *zchan);
OZ_DECLARE(zap_status_t) zap_global_init(void);
OZ_DECLARE(zap_status_t) zap_global_destroy(void);
OZ_DECLARE(void) zap_global_set_crash_policy(zap_crash_policy_t policy);
OZ_DECLARE(void) zap_global_set_logger(zap_logger_t logger);
OZ_DECLARE(void) zap_global_set_default_logger(int level);
OZ_DECLARE(uint32_t) zap_separate_string(char *buf, char delim, char **array, int arraylen);
@ -722,6 +731,15 @@ ZIO_CODEC_FUNCTION(zio_alaw2ulaw);
#define zap_mutex_unlock(_x) _zap_mutex_unlock(_x)
#endif
#define zap_assert(assertion, retval, msg) \
if (!(assertion)) { \
zap_log(ZAP_LOG_CRIT, msg); \
if (g_zap_crash_policy & ZAP_CRASH_ON_ASSERT) { \
abort(); \
} else { \
return retval; \
} \
}
static __inline__ void zap_set_state_all(zap_span_t *span, zap_channel_state_t state)
{

View File

@ -15,6 +15,10 @@
* constitutes an essential part of this license. No use of any covered code is authorized hereunder
* except under this disclaimer.
*
* Contributors:
*
* Moises Silva <moy@sangoma.com>
*
*/
@ -25,6 +29,7 @@
typedef struct zap_mutex zap_mutex_t;
typedef struct zap_thread zap_thread_t;
typedef struct zap_condition zap_condition_t;
typedef void *(*zap_thread_function_t) (zap_thread_t *, void *);
OZ_DECLARE(zap_status_t) zap_thread_create_detached(zap_thread_function_t func, void *data);
@ -35,6 +40,10 @@ OZ_DECLARE(zap_status_t) zap_mutex_destroy(zap_mutex_t **mutex);
OZ_DECLARE(zap_status_t) _zap_mutex_lock(zap_mutex_t *mutex);
OZ_DECLARE(zap_status_t) _zap_mutex_trylock(zap_mutex_t *mutex);
OZ_DECLARE(zap_status_t) _zap_mutex_unlock(zap_mutex_t *mutex);
OZ_DECLARE(zap_status_t) zap_condition_create(zap_condition_t **cond, zap_mutex_t *mutex);
OZ_DECLARE(zap_status_t) zap_condition_destroy(zap_condition_t **cond);
OZ_DECLARE(zap_status_t) zap_condition_signal(zap_condition_t *cond);
OZ_DECLARE(zap_status_t) zap_condition_wait(zap_condition_t *cond, int ms);
#endif

View File

@ -619,7 +619,6 @@ typedef enum {
ZAP_CAUSE_MEDIA_TIMEOUT = 604
} zap_call_cause_t;
#endif
/* For Emacs:

View File

@ -45,6 +45,8 @@ zap_hash_t *g_boost_modules_hash = NULL;
#define MAX_TRUNK_GROUPS 64
static time_t congestion_timeouts[MAX_TRUNK_GROUPS];
#define BOOST_QUEUE_SIZE 500
/**
* \brief Strange flag
*/
@ -1221,12 +1223,15 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon;
/* sigmod overrides socket functionality if not null */
if (sangoma_boost_data->sigmod) {
/* sigmod overrides socket functionality if not null */
mcon->sigmod = sangoma_boost_data->sigmod;
pcon->sigmod = sangoma_boost_data->sigmod;
mcon->span = span;
pcon->span = span;
/* everything could be retrieved through span, but let's use shortcuts */
mcon->sigmod = sangoma_boost_data->sigmod;
pcon->sigmod = sangoma_boost_data->sigmod;
mcon->boost_queue = sangoma_boost_data->boost_queue;
pcon->boost_queue = sangoma_boost_data->boost_queue;
}
if (zap_boost_connection_open(span) != ZAP_SUCCESS) {
@ -1381,7 +1386,7 @@ static zap_status_t zap_sangoma_boost_stop(zap_span_t *span)
/* I think stopping the span before destroying the queue makes sense
otherwise may be boost events would still arrive when the queue is already destroyed! */
status = sangoma_boost_data->sigmod->stop_span(span);
zap_queue_destroy(sangoma_boost_data->boost_queue);
zap_queue_destroy(&sangoma_boost_data->boost_queue);
return status;
}
return ZAP_SUCCESS;
@ -1489,7 +1494,7 @@ static zap_state_map_t boost_state_map = {
static BOOST_WRITE_MSG_FUNCTION(zap_boost_write_msg)
{
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
sangomabc_queue_element_t *element = malloc(sizeof(*element));
sangomabc_queue_element_t *element = calloc(1, sizeof(*element));
if (!element) {
return ZAP_FAIL;
}
@ -1583,11 +1588,10 @@ static ZIO_SIG_CONFIGURE_FUNCTION(zap_sangoma_boost_configure_span)
}
}
sangoma_boost_data = malloc(sizeof(*sangoma_boost_data));
sangoma_boost_data = calloc(1, sizeof(*sangoma_boost_data));
if (!sangoma_boost_data) {
FAIL_CONFIG_RETURN(ZAP_FAIL);
}
memset(sangoma_boost_data, 0, sizeof(*sangoma_boost_data));
/* WARNING: be sure to release this mutex on errors inside this if() */
zap_mutex_lock(g_boost_modules_mutex);
@ -1618,8 +1622,7 @@ static ZIO_SIG_CONFIGURE_FUNCTION(zap_sangoma_boost_configure_span)
if (sigmod_iface) {
/* try to create the boost queue */
sangoma_boost_data->boost_queue = zap_queue_create();
if (!sangoma_boost_data->boost_queue) {
if (zap_queue_create(&sangoma_boost_data->boost_queue, BOOST_QUEUE_SIZE) != ZAP_SUCCESS) {
zap_log(ZAP_LOG_ERROR, "Span %s could not create its boost queue!\n", span->name);
FAIL_CONFIG_RETURN(ZAP_FAIL);
}

View File

@ -108,7 +108,7 @@ struct sangomabc_connection {
uint32_t hb_elapsed;
/* boost signaling mod interface pointer (if not working in TCP mode) */
boost_sigmod_interface_t *sigmod;
zap_queue_t boost_queue;
zap_queue_t *boost_queue;
zap_span_t *span;
};

View File

@ -50,7 +50,7 @@ typedef struct zap_sangoma_boost_data {
zio_signal_cb_t signal_cb;
uint32_t flags;
boost_sigmod_interface_t *sigmod;
zap_queue_t boost_queue;
zap_queue_t *boost_queue;
} zap_sangoma_boost_data_t;
#endif

View File

@ -176,8 +176,15 @@ static void default_logger(const char *file, const char *func, int line, int lev
}
OZ_DECLARE_DATA zap_crash_policy_t g_zap_crash_policy = ZAP_CRASH_NEVER;
OZ_DECLARE_DATA zap_logger_t zap_log = null_logger;
OZ_DECLARE(void) zap_global_set_crash_policy(zap_crash_policy_t policy)
{
g_zap_crash_policy = policy;
}
OZ_DECLARE(void) zap_global_set_logger(zap_logger_t logger)
{
if (logger) {

View File

@ -37,30 +37,148 @@
#include "openzap.h"
OZ_DECLARE(zap_queue_t) zap_queue_create(void)
{
return NULL;
}
#undef zap_queue_t
#define ZAP_QUEUE_SIZE
typedef struct zap_queue {
zap_mutex_t *mutex;
zap_condition_t *condition;
zap_size_t size;
unsigned rindex;
unsigned windex;
void **elements;
} zap_queue_t;
OZ_DECLARE(zap_status_t) zap_queue_enqueue(zap_queue_t queue, void *obj)
OZ_DECLARE(zap_status_t) zap_queue_create(zap_queue_t **outqueue, zap_size_t size)
{
zap_assert(outqueue, ZAP_FAIL, "Queue double pointer is null\n");
zap_assert(size > 0, ZAP_FAIL, "Queue size is not bigger than 0\n");
*outqueue = NULL;
zap_queue_t *queue = calloc(1, sizeof(*queue));
if (!queue) {
return ZAP_FAIL;
}
queue->elements = calloc(1, (sizeof(void*)*size));
if (!queue->elements) {
goto failed;
}
queue->size = size;
if (zap_mutex_create(&queue->mutex) != ZAP_SUCCESS) {
goto failed;
}
if (zap_condition_create(&queue->condition, queue->mutex) != ZAP_SUCCESS) {
goto failed;
}
*outqueue = queue;
return ZAP_SUCCESS;
failed:
if (queue) {
if (queue->condition) {
zap_condition_destroy(&queue->condition);
}
if (queue->mutex) {
zap_mutex_destroy(&queue->mutex);
}
zap_safe_free(queue->elements);
zap_safe_free(queue);
}
return ZAP_FAIL;
}
OZ_DECLARE(void *) zap_queue_dequeue(zap_queue_t queue)
OZ_DECLARE(zap_status_t) zap_queue_enqueue(zap_queue_t *queue, void *obj)
{
return NULL;
zap_status_t status = ZAP_FAIL;
zap_assert(queue != NULL, ZAP_FAIL, "Queue is null!");
zap_mutex_lock(queue->mutex);
if (queue->windex == queue->size) {
/* try to see if we can wrap around */
queue->windex = 0;
}
if (queue->windex == queue->rindex) {
zap_log(ZAP_LOG_ERROR, "Failed to enqueue obj %p in queue %p, no more room! windex == rindex == %d!\n", obj, queue, queue->windex);
goto done;
}
queue->elements[queue->windex++] = obj;
status = ZAP_SUCCESS;
/* wake up queue reader */
zap_condition_signal(queue->condition);
done:
zap_mutex_unlock(queue->mutex);
return status;
}
OZ_DECLARE(zap_status_t) zap_queue_wait(zap_queue_t queue, int ms)
OZ_DECLARE(void *) zap_queue_dequeue(zap_queue_t *queue)
{
return ZAP_FAIL;
void *obj = NULL;
zap_assert(queue != NULL, NULL, "Queue is null!");
zap_mutex_lock(queue->mutex);
if (!queue->elements[queue->rindex]) {
goto done;
}
obj = queue->elements[queue->rindex];
queue->elements[queue->rindex] = NULL;
if (queue->rindex == queue->size) {
queue->rindex = 0;
}
done:
zap_mutex_unlock(queue->mutex);
return obj;
}
OZ_DECLARE(void) zap_queue_destroy(zap_queue_t queue)
OZ_DECLARE(zap_status_t) zap_queue_wait(zap_queue_t *queue, int ms)
{
zap_assert(queue != NULL, ZAP_FAIL, "Queue is null!");
zap_mutex_lock(queue->mutex);
if (queue->elements[queue->rindex]) {
zap_mutex_unlock(queue->mutex);
return ZAP_SUCCESS;
}
if (zap_condition_wait(queue->condition, ms)) {
zap_mutex_unlock(queue->mutex);
return ZAP_FAIL;
}
zap_mutex_unlock(queue->mutex);
return ZAP_SUCCESS;
}
OZ_DECLARE(zap_status_t) zap_queue_destroy(zap_queue_t **inqueue)
{
zap_queue_t *queue = NULL;
zap_assert(inqueue != NULL, ZAP_FAIL, "Queue is null!");
zap_assert(*inqueue != NULL, ZAP_FAIL, "Queue is null!");
queue = *inqueue;
zap_condition_destroy(&queue->condition);
zap_mutex_destroy(&queue->mutex);
zap_safe_free(queue->elements);
zap_safe_free(queue);
*inqueue = NULL;
return ZAP_SUCCESS;
}
#endif
/* For Emacs:

View File

@ -34,6 +34,10 @@ struct zap_mutex {
CRITICAL_SECTION mutex;
};
struct zap_condition {
HANDLE condition;
};
#else
#include <pthread.h>
@ -44,6 +48,11 @@ struct zap_mutex {
pthread_mutex_t mutex;
};
struct zap_condition {
pthread_cond_t condition;
pthread_mutex_t *mutex;
};
#endif
struct zap_thread {
@ -222,8 +231,91 @@ OZ_DECLARE(zap_status_t) _zap_mutex_unlock(zap_mutex_t *mutex)
}
OZ_DECLARE(zap_status_t) zap_condition_create(zap_condition_t **incondition, zap_mutex_t *mutex)
{
zap_condition_t *condition = NULL;
zap_assert(condition != NULL, ZAP_FAIL, "Condition double pointer is null!\n");
zap_assert(mutex != NULL, ZAP_FAIL, "Mutex for condition must not be null!\n");
#ifdef WIN32
return ZAP_NOTIMPL;
#endif
condition = malloc(sizeof(*condition));
if (!condition) {
return ZAP_FAIL;
}
#ifndef WIN32
condition->mutex = &mutex->mutex;
if (pthread_cond_init(&condition->condition, NULL)) {
goto failed;
}
return ZAP_SUCCESS;
failed:
if (condition) {
zap_safe_free(condition);
}
return ZAP_FAIL;
#endif
}
OZ_DECLARE(zap_status_t) zap_condition_wait(zap_condition_t *condition, int ms)
{
zap_assert(condition != NULL, ZAP_FAIL, "Condition is null!\n");
int res = 0;
#ifdef WIN32
return ZAP_NOTIMPL;
#else
if (ms > 0) {
struct timespec waitms = { 0, ((ms * 1000) * 1000)};
res = pthread_cond_timedwait(&condition->condition, condition->mutex, &waitms);
} else {
res = pthread_cond_wait(&condition->condition, condition->mutex);
}
if (res != 0) {
if (res == ETIMEDOUT) {
return ZAP_TIMEOUT;
}
return ZAP_FAIL;
}
#endif
return ZAP_SUCCESS;
}
OZ_DECLARE(zap_status_t) zap_condition_signal(zap_condition_t *condition)
{
zap_assert(condition != NULL, ZAP_FAIL, "Condition is null!\n");
#ifdef WIN32
return ZAP_NOTIMPL;
#else
if (pthread_cond_signal(&condition->condition)) {
return ZAP_FAIL;
}
#endif
return ZAP_SUCCESS;
}
OZ_DECLARE(zap_status_t) zap_condition_destroy(zap_condition_t **incondition)
{
zap_condition_t *condition = NULL;
zap_assert(incondition != NULL, ZAP_FAIL, "Condition null when destroying!\n");
condition = *incondition;
#ifdef WIN32
return ZAP_NOTIMPL;
#else
if (pthread_cond_destroy(&condition->condition)) {
return ZAP_FAIL;
}
zap_safe_free(condition);
#endif
*incondition = NULL;
return ZAP_SUCCESS;
}
/* For Emacs:
* Local Variables: