mirror of
https://github.com/asterisk/asterisk.git
synced 2025-10-19 19:20:35 +00:00
Merge res_timing_pthread. This is a timing interface for Asterisk that
does not require DAHDI. It's called "pthread" because it uses a pthread API call in the timing thread for sleeping and ensuring we wake up at an appropriate time. I wasn't sure what else to call it. :) The timing API requires a file descriptor that can be polled on. So, when you open a timer, this module creates a pipe and returns the read end of the pipe. There is a background thread that wakes up every 10ms and checks to see if any of the currently open timers need a 'tick' and writes to the appropriate pipe. git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@122928 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
480
res/res_timing_pthread.c
Normal file
480
res/res_timing_pthread.c
Normal file
@@ -0,0 +1,480 @@
|
||||
/*
|
||||
* Asterisk -- An open source telephony toolkit.
|
||||
*
|
||||
* Copyright (C) 2008, Digium, Inc.
|
||||
*
|
||||
* Russell Bryant <russell@digium.com>
|
||||
*
|
||||
* See http://www.asterisk.org for more information about
|
||||
* the Asterisk project. Please do not directly contact
|
||||
* any of the maintainers of this project for assistance;
|
||||
* the project provides a web site, mailing lists and IRC
|
||||
* channels for your use.
|
||||
*
|
||||
* This program is free software, distributed under the terms of
|
||||
* the GNU General Public License Version 2. See the LICENSE file
|
||||
* at the top of the source tree.
|
||||
*/
|
||||
|
||||
/*!
|
||||
* \file
|
||||
* \author Russell Bryant <russell@digium.com>
|
||||
*
|
||||
* \brief pthread timing interface
|
||||
*/
|
||||
|
||||
#include "asterisk.h"
|
||||
|
||||
ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
|
||||
|
||||
#include <math.h>
|
||||
#include <sys/select.h>
|
||||
|
||||
#include "asterisk/module.h"
|
||||
#include "asterisk/timing.h"
|
||||
#include "asterisk/utils.h"
|
||||
#include "asterisk/astobj2.h"
|
||||
#include "asterisk/time.h"
|
||||
#include "asterisk/lock.h"
|
||||
|
||||
static void *timing_funcs_handle;
|
||||
|
||||
static int pthread_timer_open(void);
|
||||
static void pthread_timer_close(int handle);
|
||||
static int pthread_timer_set_rate(int handle, unsigned int rate);
|
||||
static void pthread_timer_ack(int handle, unsigned int quantity);
|
||||
static int pthread_timer_enable_continuous(int handle);
|
||||
static int pthread_timer_disable_continuous(int handle);
|
||||
static enum ast_timing_event pthread_timer_get_event(int handle);
|
||||
|
||||
static struct ast_timing_functions pthread_timing_functions = {
|
||||
.timer_open = pthread_timer_open,
|
||||
.timer_close = pthread_timer_close,
|
||||
.timer_set_rate = pthread_timer_set_rate,
|
||||
.timer_ack = pthread_timer_ack,
|
||||
.timer_enable_continuous = pthread_timer_enable_continuous,
|
||||
.timer_disable_continuous = pthread_timer_disable_continuous,
|
||||
.timer_get_event = pthread_timer_get_event,
|
||||
};
|
||||
|
||||
/* 1 tick / 20 ms */
|
||||
#define TIMING_INTERVAL 20
|
||||
#define MAX_RATE 50
|
||||
|
||||
static struct ao2_container *pthread_timers;
|
||||
#define PTHREAD_TIMER_BUCKETS 563
|
||||
|
||||
enum {
|
||||
PIPE_READ = 0,
|
||||
PIPE_WRITE = 1
|
||||
};
|
||||
|
||||
enum pthread_timer_state {
|
||||
TIMER_STATE_IDLE,
|
||||
TIMER_STATE_TICKING,
|
||||
TIMER_STATE_CONTINUOUS,
|
||||
};
|
||||
|
||||
struct pthread_timer {
|
||||
int pipe[2];
|
||||
enum pthread_timer_state state;
|
||||
unsigned int rate;
|
||||
/*! Interval in ms for current rate */
|
||||
unsigned int interval;
|
||||
unsigned int tick_count;
|
||||
struct timeval start;
|
||||
};
|
||||
|
||||
static void pthread_timer_destructor(void *obj);
|
||||
static struct pthread_timer *find_timer(int handle, int unlink);
|
||||
static void write_byte(int wr_fd);
|
||||
static void read_pipe(int rd_fd, unsigned int num, int clear);
|
||||
|
||||
/*!
|
||||
* \brief Data for the timing thread
|
||||
*/
|
||||
static struct {
|
||||
pthread_t thread;
|
||||
ast_mutex_t lock;
|
||||
ast_cond_t cond;
|
||||
unsigned int stop:1;
|
||||
} timing_thread;
|
||||
|
||||
static int pthread_timer_open(void)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
|
||||
if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
|
||||
errno = ENOMEM;
|
||||
return -1;
|
||||
}
|
||||
|
||||
timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
|
||||
timer->state = TIMER_STATE_IDLE;
|
||||
|
||||
if (pipe(timer->pipe)) {
|
||||
ao2_ref(timer, -1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ao2_lock(pthread_timers);
|
||||
if (!ao2_container_count(pthread_timers)) {
|
||||
ast_mutex_lock(&timing_thread.lock);
|
||||
ast_cond_signal(&timing_thread.cond);
|
||||
ast_mutex_unlock(&timing_thread.lock);
|
||||
}
|
||||
ao2_link(pthread_timers, timer);
|
||||
ao2_unlock(pthread_timers);
|
||||
|
||||
return timer->pipe[PIPE_READ];
|
||||
}
|
||||
|
||||
static void pthread_timer_close(int handle)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
|
||||
if (!(timer = find_timer(handle, 1))) {
|
||||
return;
|
||||
}
|
||||
|
||||
ao2_ref(timer, -1);
|
||||
}
|
||||
|
||||
static int pthread_timer_set_rate(int handle, unsigned int rate)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
|
||||
if (!(timer = find_timer(handle, 0))) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (rate > 0 && rate < MAX_RATE) {
|
||||
ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
|
||||
MAX_RATE);
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
ao2_lock(timer);
|
||||
timer->rate = rate;
|
||||
timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
|
||||
timer->interval = rate ? roundf(1000.0 / ((float) rate)) : 0;
|
||||
timer->start = rate ? ast_tvnow() : ast_tv(0, 0);
|
||||
timer->tick_count = 0;
|
||||
ao2_unlock(timer);
|
||||
|
||||
ao2_ref(timer, -1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void pthread_timer_ack(int handle, unsigned int quantity)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
|
||||
ast_assert(quantity > 0);
|
||||
|
||||
if (!(timer = find_timer(handle, 0))) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (timer->state == TIMER_STATE_CONTINUOUS) {
|
||||
/* Leave the pipe alone, please! */
|
||||
return;
|
||||
}
|
||||
|
||||
read_pipe(timer->pipe[PIPE_READ], quantity, 0);
|
||||
|
||||
ao2_ref(timer, -1);
|
||||
}
|
||||
|
||||
static int pthread_timer_enable_continuous(int handle)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
|
||||
if (!(timer = find_timer(handle, 0))) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
ao2_lock(timer);
|
||||
timer->state = TIMER_STATE_CONTINUOUS;
|
||||
write_byte(timer->pipe[PIPE_WRITE]);
|
||||
ao2_unlock(timer);
|
||||
|
||||
ao2_ref(timer, -1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int pthread_timer_disable_continuous(int handle)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
|
||||
if (!(timer = find_timer(handle, 0))) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
ao2_lock(timer);
|
||||
timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
|
||||
read_pipe(timer->pipe[PIPE_READ], 0, 1);
|
||||
ao2_unlock(timer);
|
||||
|
||||
ao2_ref(timer, -1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static enum ast_timing_event pthread_timer_get_event(int handle)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
|
||||
|
||||
if (!(timer = find_timer(handle, 0))) {
|
||||
return res;
|
||||
}
|
||||
|
||||
if (timer->state == TIMER_STATE_CONTINUOUS) {
|
||||
res = AST_TIMING_EVENT_CONTINUOUS;
|
||||
}
|
||||
|
||||
ao2_ref(timer, -1);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static struct pthread_timer *find_timer(int handle, int unlink)
|
||||
{
|
||||
struct pthread_timer *timer;
|
||||
struct pthread_timer tmp_timer;
|
||||
int flags = OBJ_POINTER;
|
||||
|
||||
tmp_timer.pipe[PIPE_READ] = handle;
|
||||
|
||||
if (unlink) {
|
||||
flags |= OBJ_UNLINK;
|
||||
}
|
||||
|
||||
if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
|
||||
ast_assert(timer != NULL);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return timer;
|
||||
}
|
||||
|
||||
static void pthread_timer_destructor(void *obj)
|
||||
{
|
||||
struct pthread_timer *timer = obj;
|
||||
|
||||
if (timer->pipe[PIPE_READ] > -1) {
|
||||
close(timer->pipe[PIPE_READ]);
|
||||
timer->pipe[PIPE_READ] = -1;
|
||||
}
|
||||
|
||||
if (timer->pipe[PIPE_WRITE] > -1) {
|
||||
close(timer->pipe[PIPE_WRITE]);
|
||||
timer->pipe[PIPE_WRITE] = -1;
|
||||
}
|
||||
}
|
||||
|
||||
/*!
|
||||
* \note only PIPE_READ is guaranteed valid
|
||||
*/
|
||||
static int pthread_timer_hash(const void *obj, const int flags)
|
||||
{
|
||||
const struct pthread_timer *timer = obj;
|
||||
|
||||
return timer->pipe[PIPE_READ];
|
||||
}
|
||||
|
||||
/*!
|
||||
* \note only PIPE_READ is guaranteed valid
|
||||
*/
|
||||
static int pthread_timer_cmp(void *obj, void *arg, int flags)
|
||||
{
|
||||
struct pthread_timer *timer1 = obj, *timer2 = arg;
|
||||
|
||||
return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH : 0;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \retval 0 no timer tick needed
|
||||
* \retval non-zero write to the timing pipe needed
|
||||
*/
|
||||
static int check_timer(struct pthread_timer *timer)
|
||||
{
|
||||
struct timeval now;
|
||||
|
||||
if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
now = ast_tvnow();
|
||||
|
||||
if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
|
||||
timer->tick_count++;
|
||||
if (!timer->tick_count) {
|
||||
timer->start = now;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void read_pipe(int rd_fd, unsigned int quantity, int clear)
|
||||
{
|
||||
|
||||
ast_assert(quantity || clear);
|
||||
|
||||
if (!quantity && clear) {
|
||||
quantity = 1;
|
||||
}
|
||||
|
||||
do {
|
||||
unsigned char buf[1024];
|
||||
ssize_t res;
|
||||
fd_set rfds;
|
||||
struct timeval tv = {
|
||||
.tv_sec = 0,
|
||||
};
|
||||
|
||||
/* Make sure there is data to read */
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(rd_fd, &rfds);
|
||||
|
||||
if (select(rd_fd + 1, &rfds, NULL, NULL, &tv) != 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
res = read(rd_fd, buf,
|
||||
(quantity < sizeof(buf)) ? quantity : sizeof(buf));
|
||||
|
||||
if (res == -1) {
|
||||
if (errno == EAGAIN) {
|
||||
continue;
|
||||
}
|
||||
ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
if (clear) {
|
||||
continue;
|
||||
}
|
||||
|
||||
quantity -= res;
|
||||
} while (quantity);
|
||||
}
|
||||
|
||||
static void write_byte(int wr_fd)
|
||||
{
|
||||
do {
|
||||
ssize_t res;
|
||||
unsigned char x = 42;
|
||||
|
||||
res = write(wr_fd, &x, 1);
|
||||
|
||||
if (res == -1) {
|
||||
if (errno == EAGAIN) {
|
||||
continue;
|
||||
}
|
||||
ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
|
||||
}
|
||||
} while (0);
|
||||
}
|
||||
|
||||
static int run_timer(void *obj, void *arg, int flags)
|
||||
{
|
||||
struct pthread_timer *timer = obj;
|
||||
|
||||
if (timer->state == TIMER_STATE_IDLE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ao2_lock(timer);
|
||||
|
||||
if (check_timer(timer)) {
|
||||
write_byte(timer->pipe[PIPE_WRITE]);
|
||||
}
|
||||
|
||||
ao2_unlock(timer);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *do_timing(void *arg)
|
||||
{
|
||||
struct timeval next_wakeup = ast_tvnow();
|
||||
|
||||
while (!timing_thread.stop) {
|
||||
struct timespec ts = { 0, };
|
||||
|
||||
ao2_callback(pthread_timers, 0, run_timer, NULL);
|
||||
|
||||
next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 10000));
|
||||
|
||||
ts.tv_sec = next_wakeup.tv_sec;
|
||||
ts.tv_nsec = next_wakeup.tv_usec * 1000;
|
||||
|
||||
ast_mutex_lock(&timing_thread.lock);
|
||||
if (!timing_thread.stop) {
|
||||
if (ao2_container_count(pthread_timers)) {
|
||||
ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
|
||||
} else {
|
||||
ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
|
||||
}
|
||||
}
|
||||
ast_mutex_unlock(&timing_thread.lock);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int init_timing_thread(void)
|
||||
{
|
||||
ast_mutex_init(&timing_thread.lock);
|
||||
ast_cond_init(&timing_thread.cond, NULL);
|
||||
|
||||
if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
|
||||
ast_log(LOG_ERROR, "Unable to start timing thread.\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int load_module(void)
|
||||
{
|
||||
if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
|
||||
pthread_timer_hash, pthread_timer_cmp))) {
|
||||
return AST_MODULE_LOAD_DECLINE;
|
||||
}
|
||||
|
||||
if (init_timing_thread()) {
|
||||
ao2_ref(pthread_timers, -1);
|
||||
pthread_timers = NULL;
|
||||
return AST_MODULE_LOAD_DECLINE;
|
||||
}
|
||||
|
||||
return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
|
||||
AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
|
||||
}
|
||||
|
||||
static int unload_module(void)
|
||||
{
|
||||
#if 0
|
||||
/* XXX code to stop the timing thread ... */
|
||||
|
||||
ast_uninstall_timing_functions(timing_funcs_handle);
|
||||
ao2_ref(pthread_timers, -1);
|
||||
#endif
|
||||
|
||||
/* This module can not currently be unloaded. No use count handling is being done. */
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");
|
Reference in New Issue
Block a user