Add support for using epoll instead of poll. This should increase scalability and is done in such a way that we should be able to add support for other poll() replacements.

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@78683 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Joshua Colp
2007-08-08 21:44:58 +00:00
parent 063c747f3a
commit 22114b509d
21 changed files with 482 additions and 72 deletions

View File

@@ -67,6 +67,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/slinfactory.h"
#include "asterisk/audiohook.h"
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
struct ast_epoll_data {
struct ast_channel *chan;
int which;
};
/* uncomment if you have problems with 'monitoring' synchronized files */
#if 0
#define MONITOR_CONSTANT_DELAY
@@ -631,11 +640,16 @@ struct ast_channel *ast_channel_alloc(int needqueue, int state, const char *cid_
return NULL;
}
/* Don't bother initializing the last two FD here, because they
will *always* be set just a few lines down (AST_TIMING_FD,
AST_ALERT_FD). */
for (x = 0; x < AST_MAX_FDS - 2; x++)
#ifdef HAVE_EPOLL
tmp->epfd = epoll_create(25);
#endif
for (x = 0; x < AST_MAX_FDS; x++) {
tmp->fds[x] = -1;
#ifdef HAVE_EPOLL
tmp->epfd_data[x] = NULL;
#endif
}
#ifdef HAVE_ZAPTEL
tmp->timingfd = open("/dev/zap/timer", O_RDWR);
@@ -666,9 +680,9 @@ struct ast_channel *ast_channel_alloc(int needqueue, int state, const char *cid_
tmp->alertpipe[0] = tmp->alertpipe[1] = -1;
/* Always watch the alertpipe */
tmp->fds[AST_ALERT_FD] = tmp->alertpipe[0];
ast_channel_set_fd(tmp, AST_ALERT_FD, tmp->alertpipe[0]);
/* And timing pipe */
tmp->fds[AST_TIMING_FD] = tmp->timingfd;
ast_channel_set_fd(tmp, AST_TIMING_FD, tmp->timingfd);
ast_string_field_set(tmp, name, "**Unknown**");
/* Initial state */
@@ -1065,6 +1079,9 @@ static void free_cid(struct ast_callerid *cid)
void ast_channel_free(struct ast_channel *chan)
{
int fd;
#ifdef HAVE_EPOLL
int i;
#endif
struct ast_var_t *vardata;
struct ast_frame *f;
struct varshead *headp;
@@ -1116,6 +1133,13 @@ void ast_channel_free(struct ast_channel *chan)
close(fd);
if ((fd = chan->timingfd) > -1)
close(fd);
#ifdef HAVE_EPOLL
for (i = 0; i < AST_MAX_FDS; i++) {
if (chan->epfd_data[i])
free(chan->epfd_data[i]);
}
close(chan->epfd);
#endif
while ((f = AST_LIST_REMOVE_HEAD(&chan->readq, frame_list)))
ast_frfree(f);
@@ -1256,6 +1280,83 @@ struct ast_datastore *ast_channel_datastore_find(struct ast_channel *chan, const
return datastore;
}
/*! Set the file descriptor on the channel */
void ast_channel_set_fd(struct ast_channel *chan, int which, int fd)
{
#ifdef HAVE_EPOLL
struct epoll_event ev;
struct ast_epoll_data *aed = NULL;
if (chan->fds[which] > -1) {
epoll_ctl(chan->epfd, EPOLL_CTL_DEL, chan->fds[which], &ev);
aed = chan->epfd_data[which];
}
/* If this new fd is valid, add it to the epoll */
if (fd > -1) {
if (!aed && (!(aed = ast_calloc(1, sizeof(*aed)))))
return;
chan->epfd_data[which] = aed;
aed->chan = chan;
aed->which = which;
ev.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
ev.data.ptr = aed;
epoll_ctl(chan->epfd, EPOLL_CTL_ADD, fd, &ev);
} else if (aed) {
/* We don't have to keep around this epoll data structure now */
free(aed);
chan->epfd_data[which] = NULL;
}
#endif
chan->fds[which] = fd;
return;
}
/*! Add a channel to an optimized waitfor */
void ast_poll_channel_add(struct ast_channel *chan0, struct ast_channel *chan1)
{
#ifdef HAVE_EPOLL
struct epoll_event ev;
int i = 0;
if (chan0->epfd == -1)
return;
/* Iterate through the file descriptors on chan1, adding them to chan0 */
for (i = 0; i < AST_MAX_FDS; i++) {
if (chan1->fds[i] == -1)
continue;
ev.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
ev.data.ptr = chan1->epfd_data[i];
epoll_ctl(chan0->epfd, EPOLL_CTL_ADD, chan1->fds[i], &ev);
}
#endif
return;
}
/*! Delete a channel from an optimized waitfor */
void ast_poll_channel_del(struct ast_channel *chan0, struct ast_channel *chan1)
{
#ifdef HAVE_EPOLL
struct epoll_event ev;
int i = 0;
if (chan0->epfd == -1)
return;
for (i = 0; i < AST_MAX_FDS; i++) {
if (chan1->fds[i] == -1)
continue;
epoll_ctl(chan0->epfd, EPOLL_CTL_DEL, chan1->fds[i], &ev);
}
#endif
return;
}
/*! \brief Softly hangup a channel, don't lock */
int ast_softhangup_nolock(struct ast_channel *chan, int cause)
{
@@ -1437,7 +1538,7 @@ void ast_deactivate_generator(struct ast_channel *chan)
chan->generator->release(chan, chan->generatordata);
chan->generatordata = NULL;
chan->generator = NULL;
chan->fds[AST_GENERATOR_FD] = -1;
ast_channel_set_fd(chan, AST_GENERATOR_FD, -1);
ast_clear_flag(chan, AST_FLAG_WRITE_INT);
ast_settimeout(chan, 0, NULL, NULL);
}
@@ -1499,8 +1600,13 @@ int ast_waitfor_n_fd(int *fds, int n, int *ms, int *exception)
}
/*! \brief Wait for x amount of time on a file descriptor to have input. */
#ifdef HAVE_EPOLL
static struct ast_channel *ast_waitfor_nandfds_classic(struct ast_channel **c, int n, int *fds, int nfds,
int *exception, int *outfd, int *ms)
#else
struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds, int nfds,
int *exception, int *outfd, int *ms)
int *exception, int *outfd, int *ms)
#endif
{
struct timeval start = { 0 , 0 };
struct pollfd *pfds;
@@ -1526,15 +1632,13 @@ struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds,
*exception = 0;
/* Perform any pending masquerades */
for (x=0; x < n; x++) {
for (x = 0; x < n; x++) {
ast_channel_lock(c[x]);
if (c[x]->masq) {
if (ast_do_masquerade(c[x])) {
ast_log(LOG_WARNING, "Masquerade failed\n");
*ms = -1;
ast_channel_unlock(c[x]);
return NULL;
}
if (c[x]->masq && ast_do_masquerade(c[x])) {
ast_log(LOG_WARNING, "Masquerade failed\n");
*ms = -1;
ast_channel_unlock(c[x]);
return NULL;
}
if (c[x]->whentohangup) {
if (!whentohangup)
@@ -1564,8 +1668,8 @@ struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds,
* individual fd's must have priority over channel fds.
*/
max = 0;
for (x=0; x<n; x++) {
for (y=0; y<AST_MAX_FDS; y++) {
for (x = 0; x < n; x++) {
for (y = 0; y < AST_MAX_FDS; y++) {
fdmap[max].fdno = y; /* fd y is linked to this pfds */
fdmap[max].chan = x; /* channel x is linked to this pfds */
max += ast_add_fd(&pfds[max], c[x]->fds[y]);
@@ -1573,7 +1677,7 @@ struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds,
CHECK_BLOCKING(c[x]);
}
/* Add the individual fds */
for (x=0; x<nfds; x++) {
for (x = 0; x < nfds; x++) {
fdmap[max].chan = -1;
max += ast_add_fd(&pfds[max], fds[x]);
}
@@ -1593,7 +1697,7 @@ struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds,
} else {
res = poll(pfds, max, rms);
}
for (x=0; x<n; x++)
for (x = 0; x < n; x++)
ast_clear_flag(c[x], AST_FLAG_BLOCKING);
if (res < 0) { /* Simulate a timeout if we were interrupted */
if (errno != EINTR)
@@ -1602,7 +1706,7 @@ struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds,
}
if (whentohangup) { /* if we have a timeout, check who expired */
time(&now);
for (x=0; x<n; x++) {
for (x = 0; x < n; x++) {
if (c[x]->whentohangup && now >= c[x]->whentohangup) {
c[x]->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
if (winner == NULL)
@@ -1646,6 +1750,200 @@ struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds,
return winner;
}
#ifdef HAVE_EPOLL
static struct ast_channel *ast_waitfor_nandfds_simple(struct ast_channel *chan, int *ms)
{
struct timeval start = { 0 , 0 };
int res = 0;
struct epoll_event ev[1];
long whentohangup = 0, rms = *ms;
time_t now;
struct ast_channel *winner = NULL;
struct ast_epoll_data *aed = NULL;
ast_channel_lock(chan);
/* See if this channel needs to be masqueraded */
if (chan->masq && ast_do_masquerade(chan)) {
ast_log(LOG_WARNING, "Failed to perform masquerade on %s\n", chan->name);
*ms = -1;
ast_channel_unlock(chan);
return NULL;
}
/* Figure out their timeout */
if (chan->whentohangup) {
time(&now);
if ((whentohangup = chan->whentohangup - now) < 1) {
/* They should already be hungup! */
chan->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
ast_channel_unlock(chan);
return NULL;
}
/* If this value is smaller then the current one... make it priority */
whentohangup *= 1000;
if (rms > whentohangup)
rms = whentohangup;
}
ast_channel_unlock(chan);
/* Time to make this channel block... */
CHECK_BLOCKING(chan);
if (*ms > 0)
start = ast_tvnow();
/* We don't have to add any file descriptors... they are already added, we just have to wait! */
res = epoll_wait(chan->epfd, ev, 1, rms);
/* Stop blocking */
ast_clear_flag(chan, AST_FLAG_BLOCKING);
/* Simulate a timeout if we were interrupted */
if (res < 0) {
if (errno != EINTR)
*ms = -1;
return NULL;
}
/* If this channel has a timeout see if it expired */
if (chan->whentohangup) {
time(&now);
if (now >= chan->whentohangup) {
chan->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
winner = chan;
}
}
/* No fd ready, reset timeout and be done for now */
if (!res) {
*ms = 0;
return winner;
}
/* See what events are pending */
aed = ev[0].data.ptr;
chan->fdno = aed->which;
if (ev[0].events & EPOLLPRI)
ast_set_flag(chan, AST_FLAG_EXCEPTION);
else
ast_clear_flag(chan, AST_FLAG_EXCEPTION);
if (*ms > 0) {
*ms -= ast_tvdiff_ms(ast_tvnow(), start);
if (*ms < 0)
*ms = 0;
}
return chan;
}
static struct ast_channel *ast_waitfor_nandfds_complex(struct ast_channel **c, int n, int *ms)
{
struct timeval start = { 0 , 0 };
int res = 0, i;
struct epoll_event ev[25] = { { 0, } };
long whentohangup = 0, diff, rms = *ms;
time_t now;
struct ast_channel *winner = NULL;
for (i = 0; i < n; i++) {
ast_channel_lock(c[i]);
if (c[i]->masq && ast_do_masquerade(c[i])) {
ast_log(LOG_WARNING, "Masquerade failed\n");
*ms = -1;
ast_channel_unlock(c[i]);
return NULL;
}
if (c[i]->whentohangup) {
if (!whentohangup)
time(&now);
if ((diff = c[i]->whentohangup - now) < 1) {
c[i]->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
ast_channel_unlock(c[i]);
return c[i];
}
if (!whentohangup || (diff < whentohangup))
whentohangup = diff;
}
ast_channel_unlock(c[i]);
CHECK_BLOCKING(c[i]);
}
rms = *ms;
if (whentohangup) {
rms = whentohangup * 1000;
if (*ms >= 0 && *ms < rms)
rms = *ms;
}
if (*ms > 0)
start = ast_tvnow();
res = epoll_wait(c[0]->epfd, ev, 25, rms);
for (i = 0; i < n; i++)
ast_clear_flag(c[i], AST_FLAG_BLOCKING);
if (res < 0) {
if (errno != EINTR)
*ms = -1;
return NULL;
}
if (whentohangup) {
time(&now);
for (i = 0; i < n; i++) {
if (c[i]->whentohangup && now >= c[i]->whentohangup) {
c[i]->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
if (!winner)
winner = c[i];
}
}
}
if (!res) {
*ms = 0;
return winner;
}
for (i = 0; i < 25; i++) {
struct ast_epoll_data *aed = ev[i].data.ptr;
if (!ev[i].events || !aed)
continue;
winner = aed->chan;
if (ev[i].events & EPOLLPRI)
ast_set_flag(winner, AST_FLAG_EXCEPTION);
else
ast_clear_flag(winner, AST_FLAG_EXCEPTION);
winner->fdno = aed->which;
}
if (*ms > 0) {
*ms -= ast_tvdiff_ms(ast_tvnow(), start);
if (*ms < 0)
*ms = 0;
}
return winner;
}
struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds, int nfds,
int *exception, int *outfd, int *ms)
{
/* If no epoll file descriptor is available resort to classic nandfds */
if (!n || nfds || c[0]->epfd == -1)
return ast_waitfor_nandfds_classic(c, n, fds, nfds, exception, outfd, ms);
else if (!nfds && n == 1)
return ast_waitfor_nandfds_simple(c[0], ms);
else
return ast_waitfor_nandfds_complex(c, n, ms);
}
#endif
struct ast_channel *ast_waitfor_n(struct ast_channel **c, int n, int *ms)
{
return ast_waitfor_nandfds(c, n, NULL, 0, NULL, NULL, ms);
@@ -3259,7 +3557,7 @@ int ast_do_masquerade(struct ast_channel *original)
/* Copy the FD's other than the generator fd */
for (x = 0; x < AST_MAX_FDS; x++) {
if (x != AST_GENERATOR_FD)
original->fds[x] = clone->fds[x];
ast_channel_set_fd(original, x, clone->fds[x]);
}
ast_app_group_update(clone, original);
@@ -3290,7 +3588,7 @@ int ast_do_masquerade(struct ast_channel *original)
clone->cid = tmpcid;
/* Restore original timing file descriptor */
original->fds[AST_TIMING_FD] = original->timingfd;
ast_channel_set_fd(original, AST_TIMING_FD, original->timingfd);
/* Our native formats are different now */
original->nativeformats = clone->nativeformats;
@@ -3487,6 +3785,8 @@ static enum ast_bridge_result ast_generic_bridge(struct ast_channel *c0, struct
/* Check the need of a jitterbuffer for each channel */
jb_in_use = ast_jb_do_usecheck(c0, c1);
ast_poll_channel_add(c0, c1);
for (;;) {
struct ast_channel *who, *other;
@@ -3591,11 +3891,16 @@ static enum ast_bridge_result ast_generic_bridge(struct ast_channel *c0, struct
/* XXX do we want to pass on also frames not matched above ? */
ast_frfree(f);
#ifndef HAVE_EPOLL
/* Swap who gets priority */
cs[2] = cs[0];
cs[0] = cs[1];
cs[1] = cs[2];
#endif
}
ast_poll_channel_del(c0, c1);
return res;
}