Fixed zap_queues

git-svn-id: http://svn.openzap.org/svn/openzap/branches/sangoma_boost@892 a93c3328-9c30-0410-af19-c9cd2b2d52af
This commit is contained in:
David Yat Sin 2009-11-20 22:57:24 +00:00
parent 3268b4ae34
commit 3245ec9997
4 changed files with 34 additions and 17 deletions

View File

@ -1195,6 +1195,13 @@ static int zap_boost_wait_event(zap_span_t *span, int ms)
sangomabc_connection_t *mcon, *pcon; sangomabc_connection_t *mcon, *pcon;
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
if (sangoma_boost_data->sigmod) {
int result;
result = zap_queue_wait(sangoma_boost_data->boost_queue, ms);
if (result == ZAP_TIMEOUT) return 0;
if (result != ZAP_SUCCESS) return -1;
return 1;
}
mcon = &sangoma_boost_data->mcon; mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon; pcon = &sangoma_boost_data->pcon;
@ -1266,6 +1273,7 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
} }
if (zap_boost_connection_open(span) != ZAP_SUCCESS) { if (zap_boost_connection_open(span) != ZAP_SUCCESS) {
zap_log(ZAP_LOG_ERROR, "zap_boost_connection failed\n");
goto end; goto end;
} }
@ -1291,10 +1299,12 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
SIGBOOST_EVENT_SYSTEM_RESTART, SIGBOOST_EVENT_SYSTEM_RESTART,
0); 0);
zap_set_flag(mcon, MSU_FLAG_DOWN); zap_set_flag(mcon, MSU_FLAG_DOWN);
zap_log(ZAP_LOG_DEBUG, "OPENZAP is no longer running\n");
break; break;
} }
if ((activity = zap_boost_wait_event(span, ms)) < 0) { if ((activity = zap_boost_wait_event(span, ms)) < 0) {
zap_log(ZAP_LOG_ERROR, "Zap boost waitevent failed\n");
goto error; goto error;
} }
@ -1534,8 +1544,7 @@ static BOOST_WRITE_MSG_FUNCTION(zap_boost_write_msg)
} }
memcpy(&element->boostmsg, msg, msglen); memcpy(&element->boostmsg, msg, msglen);
element->size = msglen; element->size = msglen;
zap_queue_enqueue(sangoma_boost_data->boost_queue, element); return zap_queue_enqueue(sangoma_boost_data->boost_queue, element);
return ZAP_SUCCESS;
} }
static BOOST_SIG_STATUS_CB_FUNCTION(zap_boost_sig_status_change) static BOOST_SIG_STATUS_CB_FUNCTION(zap_boost_sig_status_change)

View File

@ -2858,6 +2858,7 @@ OZ_DECLARE(zap_status_t) zap_global_init(void)
globals.span_hash = create_hashtable(16, zap_hash_hashfromstring, zap_hash_equalkeys); globals.span_hash = create_hashtable(16, zap_hash_hashfromstring, zap_hash_equalkeys);
zap_mutex_create(&globals.mutex); zap_mutex_create(&globals.mutex);
zap_mutex_create(&globals.span_mutex); zap_mutex_create(&globals.span_mutex);
globals.running = 1;
return ZAP_SUCCESS; return ZAP_SUCCESS;
} }

View File

@ -39,6 +39,7 @@ typedef struct zap_queue {
zap_mutex_t *mutex; zap_mutex_t *mutex;
zap_condition_t *condition; zap_condition_t *condition;
zap_size_t size; zap_size_t size;
zap_size_t num_elements;
unsigned rindex; unsigned rindex;
unsigned windex; unsigned windex;
void **elements; void **elements;
@ -128,13 +129,13 @@ static zap_status_t zap_std_queue_enqueue(zap_queue_t *queue, void *obj)
queue->windex = 0; queue->windex = 0;
} }
if (queue->windex == queue->rindex) { if (queue->num_elements != 0 && queue->windex == queue->rindex) {
zap_log(ZAP_LOG_ERROR, "Failed to enqueue obj %p in queue %p, no more room! windex == rindex == %d!\n", obj, queue, queue->windex); zap_log(ZAP_LOG_ERROR, "Failed to enqueue obj %p in queue %p, no more room! windex == rindex == %d!\n", obj, queue, queue->windex);
goto done; goto done;
} }
queue->elements[queue->windex++] = obj; queue->elements[queue->windex++] = obj;
status = ZAP_SUCCESS; status = ZAP_SUCCESS;
queue->num_elements++;
/* wake up queue reader */ /* wake up queue reader */
zap_condition_signal(queue->condition); zap_condition_signal(queue->condition);
@ -153,12 +154,13 @@ static void *zap_std_queue_dequeue(zap_queue_t *queue)
zap_mutex_lock(queue->mutex); zap_mutex_lock(queue->mutex);
if (!queue->elements[queue->rindex]) { if (queue->num_elements == 0) {
goto done; goto done;
} }
obj = queue->elements[queue->rindex]; obj = queue->elements[queue->rindex];
queue->elements[queue->rindex] = NULL; queue->elements[queue->rindex++] = NULL;
queue->num_elements--;
if (queue->rindex == queue->size) { if (queue->rindex == queue->size) {
queue->rindex = 0; queue->rindex = 0;
} }
@ -170,6 +172,7 @@ done:
static zap_status_t zap_std_queue_wait(zap_queue_t *queue, int ms) static zap_status_t zap_std_queue_wait(zap_queue_t *queue, int ms)
{ {
zap_status_t ret;
zap_assert(queue != NULL, ZAP_FAIL, "Queue is null!"); zap_assert(queue != NULL, ZAP_FAIL, "Queue is null!");
zap_mutex_lock(queue->mutex); zap_mutex_lock(queue->mutex);
@ -179,14 +182,9 @@ static zap_status_t zap_std_queue_wait(zap_queue_t *queue, int ms)
return ZAP_SUCCESS; return ZAP_SUCCESS;
} }
if (zap_condition_wait(queue->condition, ms)) { ret = zap_condition_wait(queue->condition, ms);
zap_mutex_unlock(queue->mutex); zap_mutex_unlock(queue->mutex);
return ZAP_FAIL; return ret;
}
zap_mutex_unlock(queue->mutex);
return ZAP_SUCCESS;
} }
static zap_status_t zap_std_queue_destroy(zap_queue_t **inqueue) static zap_status_t zap_std_queue_destroy(zap_queue_t **inqueue)

View File

@ -205,8 +205,11 @@ OZ_DECLARE(zap_status_t) _zap_mutex_lock(zap_mutex_t *mutex)
#ifdef WIN32 #ifdef WIN32
EnterCriticalSection(&mutex->mutex); EnterCriticalSection(&mutex->mutex);
#else #else
if (pthread_mutex_lock(&mutex->mutex)) int err;
if ((err = pthread_mutex_lock(&mutex->mutex))) {
zap_log(ZAP_LOG_ERROR, "Failed to lock mutex %d:%s\n", err, strerror(err));
return ZAP_FAIL; return ZAP_FAIL;
}
#endif #endif
return ZAP_SUCCESS; return ZAP_SUCCESS;
} }
@ -259,6 +262,7 @@ OZ_DECLARE(zap_status_t) zap_condition_create(zap_condition_t **incondition, zap
} }
#endif #endif
*incondition = condition;
return ZAP_SUCCESS; return ZAP_SUCCESS;
failed: failed:
@ -287,7 +291,10 @@ OZ_DECLARE(zap_status_t) zap_condition_wait(zap_condition_t *condition, int ms)
#else #else
int res = 0; int res = 0;
if (ms > 0) { if (ms > 0) {
struct timespec waitms = { 0, ((ms * 1000) * 1000)}; struct timespec waitms;
waitms.tv_sec = time(NULL)+(ms/1000);
waitms.tv_nsec = 1000*1000*(ms%1000);
res = pthread_cond_timedwait(&condition->condition, condition->mutex, &waitms); res = pthread_cond_timedwait(&condition->condition, condition->mutex, &waitms);
} else { } else {
res = pthread_cond_wait(&condition->condition, condition->mutex); res = pthread_cond_wait(&condition->condition, condition->mutex);
@ -310,7 +317,9 @@ OZ_DECLARE(zap_status_t) zap_condition_signal(zap_condition_t *condition)
return ZAP_FAIL; return ZAP_FAIL;
} }
#else #else
if (pthread_cond_signal(&condition->condition)) { int err;
if ((err = pthread_cond_signal(&condition->condition))) {
zap_log(ZAP_LOG_ERROR, "Failed to signal condition %d:%s\n", err, strerror(err));
return ZAP_FAIL; return ZAP_FAIL;
} }
#endif #endif