Merge team/russell/frame_caching

There are some situations in Asterisk where ast_frame and/or iax_frame
structures are rapidly allocatted and freed (at least 50 times per second
for one call).

This code significantly improves the performance of ast_frame_header_new(), 
ast_frdup(), ast_frfree(), iax_frame_new(), and iax_frame_free() by keeping
a thread-local cache of these structures and using frames from the cache 
whenever possible instead of calling malloc/free every time.

This commit also converts the ast_frame and iax_frame structures to use the
linked list macros.


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@41278 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Russell Bryant
2006-08-29 20:50:36 +00:00
parent d22476348a
commit f7e7161607
16 changed files with 345 additions and 313 deletions

View File

@@ -621,11 +621,11 @@ struct chan_iax2_pvt {
};
static struct ast_iax2_queue {
struct iax_frame *head;
struct iax_frame *tail;
AST_LIST_HEAD(, iax_frame) queue;
int count;
ast_mutex_t lock;
} iaxq;
} iaxq = {
.queue = AST_LIST_HEAD_INIT_VALUE
};
static AST_LIST_HEAD_STATIC(users, iax2_user);
@@ -1071,11 +1071,12 @@ static struct chan_iax2_pvt *new_iax(struct sockaddr_in *sin, int lockpeer, cons
static struct iax_frame *iaxfrdup2(struct iax_frame *fr)
{
/* Malloc() a copy of a frame */
struct iax_frame *new = iax_frame_new(DIRECTION_INGRESS, fr->af.datalen);
if (new) {
memcpy(new, fr, sizeof(struct iax_frame));
size_t mallocd_datalen = new->mallocd_datalen;
memcpy(new, fr, sizeof(*new));
iax_frame_wrap(new, &fr->af);
new->mallocd_datalen = mallocd_datalen;
new->data = NULL;
new->datalen = 0;
new->direction = DIRECTION_INGRESS;
@@ -1754,7 +1755,7 @@ retry:
ast_queue_hangup(owner);
}
for (cur = iaxq.head; cur ; cur = cur->next) {
AST_LIST_TRAVERSE(&iaxq.queue, cur, list) {
/* Cancel any pending transmissions */
if (cur->callno == pvt->callno)
cur->retries = -1;
@@ -1875,17 +1876,10 @@ static void __attempt_transmit(void *data)
/* Do not try again */
if (freeme) {
/* Don't attempt delivery, just remove it from the queue */
ast_mutex_lock(&iaxq.lock);
if (f->prev)
f->prev->next = f->next;
else
iaxq.head = f->next;
if (f->next)
f->next->prev = f->prev;
else
iaxq.tail = f->prev;
AST_LIST_LOCK(&iaxq.queue);
AST_LIST_REMOVE(&iaxq.queue, f, list);
iaxq.count--;
ast_mutex_unlock(&iaxq.lock);
AST_LIST_UNLOCK(&iaxq.queue);
f->retrans = -1;
/* Free the IAX frame */
iax2_frame_free(f);
@@ -2082,7 +2076,7 @@ static int iax2_show_stats(int fd, int argc, char *argv[])
int cnt = 0, dead=0, final=0;
if (argc != 3)
return RESULT_SHOWUSAGE;
for (cur = iaxq.head; cur ; cur = cur->next) {
AST_LIST_TRAVERSE(&iaxq.queue, cur, list) {
if (cur->retries < 0)
dead++;
if (cur->final)
@@ -2092,7 +2086,8 @@ static int iax2_show_stats(int fd, int argc, char *argv[])
ast_cli(fd, " IAX Statistics\n");
ast_cli(fd, "---------------------\n");
ast_cli(fd, "Outstanding frames: %d (%d ingress, %d egress)\n", iax_get_frames(), iax_get_iframes(), iax_get_oframes());
ast_cli(fd, "Packets in transmit queue: %d dead, %d final, %d total\n", dead, final, cnt);
ast_cli(fd, "Packets in transmit queue: %d dead, %d final, %d total\n\n", dead, final, cnt);
return RESULT_SUCCESS;
}
@@ -2369,24 +2364,13 @@ static int schedule_delivery(struct iax_frame *fr, int updatehistory, int fromtr
static int iax2_transmit(struct iax_frame *fr)
{
/* Lock the queue and place this packet at the end */
fr->next = NULL;
fr->prev = NULL;
/* By setting this to 0, the network thread will send it for us, and
queue retransmission if necessary */
fr->sentyet = 0;
ast_mutex_lock(&iaxq.lock);
if (!iaxq.head) {
/* Empty queue */
iaxq.head = fr;
iaxq.tail = fr;
} else {
/* Double link */
iaxq.tail->next = fr;
fr->prev = iaxq.tail;
iaxq.tail = fr;
}
AST_LIST_LOCK(&iaxq.queue);
AST_LIST_INSERT_TAIL(&iaxq.queue, fr, list);
iaxq.count++;
ast_mutex_unlock(&iaxq.lock);
AST_LIST_UNLOCK(&iaxq.queue);
/* Wake up the network and scheduler thread */
pthread_kill(netthreadid, SIGURG);
signal_condition(&sched_lock, &sched_cond);
@@ -5330,15 +5314,15 @@ static int complete_transfer(int callno, struct iax_ies *ies)
pvt->lastsent = 0;
pvt->nextpred = 0;
pvt->pingtime = DEFAULT_RETRY_TIME;
ast_mutex_lock(&iaxq.lock);
for (cur = iaxq.head; cur ; cur = cur->next) {
AST_LIST_LOCK(&iaxq.queue);
AST_LIST_TRAVERSE(&iaxq.queue, cur, list) {
/* We must cancel any packets that would have been transmitted
because now we're talking to someone new. It's okay, they
were transmitted to someone that didn't care anyway. */
if (callno == cur->callno)
cur->retries = -1;
}
ast_mutex_unlock(&iaxq.lock);
AST_LIST_UNLOCK(&iaxq.queue);
return 0;
}
@@ -5835,17 +5819,16 @@ static int iax2_vnak(int callno)
static void vnak_retransmit(int callno, int last)
{
struct iax_frame *f;
ast_mutex_lock(&iaxq.lock);
f = iaxq.head;
while(f) {
AST_LIST_LOCK(&iaxq.queue);
AST_LIST_TRAVERSE(&iaxq.queue, f, list) {
/* Send a copy immediately */
if ((f->callno == callno) && iaxs[f->callno] &&
(f->oseqno >= last)) {
send_packet(f);
}
f = f->next;
}
ast_mutex_unlock(&iaxq.lock);
AST_LIST_UNLOCK(&iaxq.queue);
}
static void __iax2_poke_peer_s(void *data)
@@ -6570,8 +6553,8 @@ static int socket_process(struct iax2_thread *thread)
/* Ack the packet with the given timestamp */
if (option_debug && iaxdebug)
ast_log(LOG_DEBUG, "Cancelling transmission of packet %d\n", x);
ast_mutex_lock(&iaxq.lock);
for (cur = iaxq.head; cur ; cur = cur->next) {
AST_LIST_LOCK(&iaxq.queue);
AST_LIST_TRAVERSE(&iaxq.queue, cur, list) {
/* If it's our call, and our timestamp, mark -1 retries */
if ((fr->callno == cur->callno) && (x == cur->oseqno)) {
cur->retries = -1;
@@ -6583,7 +6566,7 @@ static int socket_process(struct iax2_thread *thread)
}
}
}
ast_mutex_unlock(&iaxq.lock);
AST_LIST_UNLOCK(&iaxq.queue);
}
/* Note how much we've received acknowledgement for */
if (iaxs[fr->callno])
@@ -6723,13 +6706,13 @@ retryowner:
case IAX_COMMAND_TXACC:
if (iaxs[fr->callno]->transferring == TRANSFER_BEGIN) {
/* Ack the packet with the given timestamp */
ast_mutex_lock(&iaxq.lock);
for (cur = iaxq.head; cur ; cur = cur->next) {
AST_LIST_LOCK(&iaxq.queue);
AST_LIST_TRAVERSE(&iaxq.queue, cur, list) {
/* Cancel any outstanding txcnt's */
if ((fr->callno == cur->callno) && (cur->transfer))
cur->retries = -1;
}
ast_mutex_unlock(&iaxq.lock);
AST_LIST_UNLOCK(&iaxq.queue);
memset(&ied1, 0, sizeof(ied1));
iax_ie_append_short(&ied1, IAX_IE_CALLNO, iaxs[fr->callno]->callno);
send_command(iaxs[fr->callno], AST_FRAME_IAX, IAX_COMMAND_TXREADY, 0, ied1.buf, ied1.pos, -1);
@@ -8031,49 +8014,42 @@ static void *network_thread(void *ignore)
/* Our job is simple: Send queued messages, retrying if necessary. Read frames
from the network, and queue them for delivery to the channels */
int res, count;
struct iax_frame *f, *freeme;
struct iax_frame *f;
if (timingfd > -1)
ast_io_add(io, timingfd, timing_read, AST_IO_IN | AST_IO_PRI, NULL);
for(;;) {
/* Go through the queue, sending messages which have not yet been
sent, and scheduling retransmissions if appropriate */
ast_mutex_lock(&iaxq.lock);
f = iaxq.head;
AST_LIST_LOCK(&iaxq.queue);
count = 0;
while(f) {
freeme = NULL;
if (!f->sentyet) {
f->sentyet++;
/* Send a copy immediately -- errors here are ok, so don't bother locking */
if (iaxs[f->callno]) {
send_packet(f);
count++;
}
if (f->retries < 0) {
/* This is not supposed to be retransmitted */
if (f->prev)
f->prev->next = f->next;
else
iaxq.head = f->next;
if (f->next)
f->next->prev = f->prev;
else
iaxq.tail = f->prev;
iaxq.count--;
/* Free the iax frame */
freeme = f;
} else {
/* We need reliable delivery. Schedule a retransmission */
f->retries++;
f->retrans = ast_sched_add(sched, f->retrytime, attempt_transmit, f);
signal_condition(&sched_lock, &sched_cond);
}
AST_LIST_TRAVERSE_SAFE_BEGIN(&iaxq.queue, f, list) {
if (f->sentyet)
continue;
f->sentyet++;
/* Send a copy immediately -- errors here are ok, so don't bother locking */
if (iaxs[f->callno]) {
send_packet(f);
count++;
}
if (f->retries < 0) {
/* This is not supposed to be retransmitted */
AST_LIST_REMOVE(&iaxq.queue, f, list);
iaxq.count--;
/* Free the iax frame */
iax_frame_free(f);
} else {
/* We need reliable delivery. Schedule a retransmission */
f->retries++;
f->retrans = ast_sched_add(sched, f->retrytime, attempt_transmit, f);
signal_condition(&sched_lock, &sched_cond);
}
f = f->next;
if (freeme)
iax_frame_free(freeme);
}
ast_mutex_unlock(&iaxq.lock);
AST_LIST_TRAVERSE_SAFE_END
AST_LIST_UNLOCK(&iaxq.queue);
if (count >= 20)
ast_log(LOG_DEBUG, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
@@ -9751,7 +9727,6 @@ static int __unload_module(void)
static int unload_module(void)
{
ast_mutex_destroy(&iaxq.lock);
ast_mutex_destroy(&waresl.lock);
ast_custom_function_unregister(&iaxpeer_function);
return __unload_module();
@@ -9803,7 +9778,6 @@ static int load_module(void)
}
ast_netsock_init(netsock);
ast_mutex_init(&iaxq.lock);
ast_mutex_init(&waresl.lock);
ast_cli_register_multiple(iax2_cli, sizeof(iax2_cli) / sizeof(iax2_cli[0]));

View File

@@ -226,7 +226,7 @@ static void check_bridge(struct local_pvt *p, int isoutbound)
frames on the owner channel (because they would be transferred to the
outbound channel during the masquerade)
*/
if (isoutbound && p->chan->_bridge /* Not ast_bridged_channel! Only go one step! */ && !p->owner->readq) {
if (isoutbound && p->chan->_bridge /* Not ast_bridged_channel! Only go one step! */ && AST_LIST_EMPTY(&p->owner->readq)) {
/* Masquerade bridged channel into owner */
/* Lock everything we need, one by one, and give up if
we can't get everything. Remember, we'll get another
@@ -248,7 +248,7 @@ static void check_bridge(struct local_pvt *p, int isoutbound)
when the local channels go away.
*/
#if 0
} else if (!isoutbound && p->owner && p->owner->_bridge && p->chan && !p->chan->readq) {
} else if (!isoutbound && p->owner && p->owner->_bridge && p->chan && AST_LIST_EMPTY(&p->chan->readq)) {
/* Masquerade bridged channel into chan */
if (!ast_mutex_trylock(&(p->owner->_bridge)->lock)) {
if (!p->owner->_bridge->_softhangup) {

View File

@@ -3516,7 +3516,6 @@ static struct ast_frame *zt_handle_event(struct ast_channel *ast)
pthread_t threadid;
pthread_attr_t attr;
struct ast_channel *chan;
struct ast_frame dtmf_frame = { .frametype = AST_FRAME_DTMF };
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
@@ -3560,8 +3559,6 @@ static struct ast_frame *zt_handle_event(struct ast_channel *ast)
*/
p->subs[index].f.frametype = AST_FRAME_DTMF_BEGIN;
p->subs[index].f.subclass = res & 0xff;
dtmf_frame.subclass = res & 0xff;
p->subs[index].f.next = ast_frdup(&dtmf_frame);
#ifdef HAVE_PRI
}
#endif

View File

@@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/utils.h"
#include "asterisk/unaligned.h"
#include "asterisk/lock.h"
#include "asterisk/threadstorage.h"
#include "iax2.h"
#include "iax2-parser.h"
@@ -49,6 +50,15 @@ static int frames = 0;
static int iframes = 0;
static int oframes = 0;
static void frame_cache_cleanup(void *data);
/*! \brief A per-thread cache of iax_frame structures */
AST_THREADSTORAGE_CUSTOM(frame_cache, frame_cache_init, frame_cache_cleanup);
/*! \brief This is just so iax_frames, a list head struct for holding a list of
* iax_frame structures, is defined. */
AST_LIST_HEAD_NOLOCK(iax_frames, iax_frame);
static void internaloutput(const char *str)
{
fputs(str, stdout);
@@ -926,22 +936,44 @@ void iax_frame_wrap(struct iax_frame *fr, struct ast_frame *f)
struct iax_frame *iax_frame_new(int direction, int datalen)
{
struct iax_frame *fr;
fr = malloc((int)sizeof(struct iax_frame) + datalen);
if (fr) {
fr->direction = direction;
fr->retrans = -1;
ast_atomic_fetchadd_int(&frames, 1);
if (fr->direction == DIRECTION_INGRESS)
ast_atomic_fetchadd_int(&iframes, 1);
else
ast_atomic_fetchadd_int(&oframes, 1);
struct iax_frame *fr = NULL;
struct iax_frames *iax_frames;
/* Attempt to get a frame from this thread's cache */
if ((iax_frames = ast_threadstorage_get(&frame_cache, sizeof(*iax_frames)))) {
AST_LIST_TRAVERSE_SAFE_BEGIN(iax_frames, fr, list) {
if (fr->mallocd_datalen >= datalen) {
size_t mallocd_datalen = fr->mallocd_datalen;
AST_LIST_REMOVE_CURRENT(iax_frames, list);
memset(fr, 0, sizeof(*fr));
fr->mallocd_datalen = mallocd_datalen;
break;
}
}
AST_LIST_TRAVERSE_SAFE_END
}
if (!fr) {
if (!(fr = ast_calloc(1, sizeof(*fr) + datalen)))
return NULL;
fr->mallocd_datalen = datalen;
}
fr->direction = direction;
fr->retrans = -1;
if (fr->direction == DIRECTION_INGRESS)
ast_atomic_fetchadd_int(&iframes, 1);
else
ast_atomic_fetchadd_int(&oframes, 1);
return fr;
}
void iax_frame_free(struct iax_frame *fr)
static void __iax_frame_free(struct iax_frame *fr, int cache)
{
struct iax_frames *iax_frames;
/* Note: does not remove from scheduler! */
if (fr->direction == DIRECTION_INGRESS)
ast_atomic_fetchadd_int(&iframes, -1);
@@ -952,8 +984,34 @@ void iax_frame_free(struct iax_frame *fr)
return;
}
fr->direction = 0;
free(fr);
ast_atomic_fetchadd_int(&frames, -1);
if (!cache) {
free(fr);
return;
}
if (!(iax_frames = ast_threadstorage_get(&frame_cache, sizeof(*iax_frames)))) {
free(fr);
return;
}
AST_LIST_INSERT_HEAD(iax_frames, fr, list);
}
static void frame_cache_cleanup(void *data)
{
struct iax_frames *frames = data;
struct iax_frame *cur;
while ((cur = AST_LIST_REMOVE_HEAD(frames, list)))
__iax_frame_free(cur, 0);
free(frames);
}
void iax_frame_free(struct iax_frame *fr)
{
__iax_frame_free(fr, 1);
}
int iax_get_frames(void) { return frames; }

View File

@@ -18,6 +18,8 @@
#ifndef _IAX2_PARSER_H
#define _IAX2_PARSER_H
#include "asterisk/linkedlists.h"
struct iax_ies {
char *called_number;
char *calling_number;
@@ -115,10 +117,11 @@ struct iax_frame {
/* Retransmission ID */
int retrans;
/* Easy linking */
struct iax_frame *next;
struct iax_frame *prev;
AST_LIST_ENTRY(iax_frame) list;
/* Actual, isolated frame header */
struct ast_frame af;
/*! Amount of space _allocated_ for data */
size_t mallocd_datalen;
unsigned char unused[AST_FRIENDLY_OFFSET];
unsigned char afdata[0]; /* Data for frame */
};