From c82440e2463a97846e21af4c7a1ef20dfd7261f2 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Mon, 30 Mar 2009 21:12:06 +0000 Subject: [PATCH] change blocking rtp to psuedo-blocking to avoid endlessly blocking reads and refactor jitter buffer git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@12846 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- src/switch_apr.c | 8 ++++- src/switch_rtp.c | 91 ++++++++++++++++++++++++++++++++++++------------ 2 files changed, 76 insertions(+), 23 deletions(-) diff --git a/src/switch_apr.c b/src/switch_apr.c index ae4ca68c30..a24dc361e8 100644 --- a/src/switch_apr.c +++ b/src/switch_apr.c @@ -791,7 +791,13 @@ SWITCH_DECLARE(switch_status_t) switch_pollset_add(switch_pollset_t *pollset, co SWITCH_DECLARE(switch_status_t) switch_poll(switch_pollfd_t *aprset, int32_t numsock, int32_t *nsds, switch_interval_time_t timeout) { - return apr_poll((apr_pollfd_t *)aprset, numsock, nsds, timeout); + apr_status_t st = apr_poll((apr_pollfd_t *)aprset, numsock, nsds, timeout); + + if (st == APR_TIMEUP) { + st = SWITCH_STATUS_TIMEOUT; + } + + return st; } SWITCH_DECLARE(switch_status_t) switch_socket_create_pollfd(switch_pollfd_t **poll, switch_socket_t *sock, int16_t flags, switch_memory_pool_t *pool) diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 026f1ecea2..d8905c1324 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -129,7 +129,9 @@ struct switch_rtp { * used. */ switch_socket_t *sock_input, *sock_output; - + switch_pollfd_t *read_pollfd; + switch_pollfd_t *jb_pollfd; + switch_sockaddr_t *local_addr; rtp_msg_t send_msg; @@ -188,7 +190,6 @@ struct switch_rtp { switch_timer_t timer; uint8_t ready; uint8_t cn; - switch_time_t last_time; stfu_instance_t *jb; uint32_t max_missed_packets; uint32_t missed_count; @@ -598,6 +599,8 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK)) { switch_socket_opt_set(rtp_session->sock_input, SWITCH_SO_NONBLOCK, TRUE); switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_NOBLOCK); + } else { + switch_socket_create_pollfd(&rtp_session->read_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool); } status = SWITCH_STATUS_SUCCESS; @@ -1013,7 +1016,14 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_stun_ping(switch_rtp_t *rtp_ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_jitter_buffer(switch_rtp_t *rtp_session, uint32_t queue_frames) { + if (rtp_session->read_pollfd) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't use jitterbuffer without a timer.\n"); + return SWITCH_STATUS_FALSE; + } + rtp_session->jb = stfu_n_init(queue_frames); + switch_socket_create_pollfd(&rtp_session->jb_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool); + return SWITCH_STATUS_SUCCESS; } @@ -1364,11 +1374,15 @@ static void do_flush(switch_rtp_t *rtp_session) static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_type, switch_frame_flag_t *flags, switch_io_flag_t io_flags) { switch_size_t bytes = 0; - switch_status_t status; + switch_status_t status = SWITCH_STATUS_SUCCESS, poll_status = SWITCH_STATUS_SUCCESS; int check = 0; stfu_frame_t *jb_frame; int ret = -1; int sleep_mss = 1000; + int poll_sec = 5; + int poll_loop = 0; + int fdr = 0; + int from_jb = 0; if (!switch_rtp_ready(rtp_session)) { return -1; @@ -1376,8 +1390,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ if (rtp_session->timer.interval) { sleep_mss = rtp_session->timer.interval * 1000; - } else { - rtp_session->last_time = switch_time_now(); } READ_INC(rtp_session); @@ -1386,13 +1398,54 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ int do_cng = 0; if (rtp_session->timer.interval) { - switch_core_timer_next(&rtp_session->timer); + int do_sleep = 1; + if (rtp_session->jb) { + if (switch_poll(rtp_session->jb_pollfd, 1, &fdr, 1) == SWITCH_STATUS_SUCCESS) { + do_sleep = 0; + } + } + if (do_sleep) { + switch_core_timer_next(&rtp_session->timer); + } } recvfrom: - bytes = sizeof(rtp_msg_t); - status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes); + if (rtp_session->jb) { + if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) { + memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen); + if (jb_frame->plc) { + *flags |= SFF_PLC; + } + bytes = jb_frame->dlen + rtp_header_len; + rtp_session->recv_msg.header.ts = htonl(jb_frame->ts); + rtp_session->recv_msg.header.pt = rtp_session->payload; + from_jb = 1; + goto post_read; + } + } + + + if (rtp_session->read_pollfd) { + poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, poll_sec * 1000000); + } + + + if (poll_status == SWITCH_STATUS_SUCCESS) { + bytes = sizeof(rtp_msg_t); + status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes); + } else { + if (!SWITCH_STATUS_IS_BREAK(poll_status) && poll_status != SWITCH_STATUS_TIMEOUT) { + ret = -1; + goto end; + } + + poll_loop = 1; + rtp_session->missed_count += (poll_sec * 1000 ) / (rtp_session->ms_per_packet / 1000); + bytes = 0; + } + + post_read: if (bytes < 0) { ret = (int) bytes; @@ -1427,6 +1480,10 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ continue; } + if (!bytes && poll_loop) { + goto recvfrom; + } + if (bytes && rtp_session->recv_msg.header.m && rtp_session->recv_msg.header.pt != rtp_session->te) { rtp_flush_read_buffer(rtp_session); } @@ -1517,26 +1574,16 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ goto end; } - if (rtp_session->jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) { + if (rtp_session->jb && !from_jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) { if (bytes) { + if (rtp_session->recv_msg.header.m) { stfu_n_reset(rtp_session->jb); } - stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len); + stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len); bytes = 0; - } - - if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) { - memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen); - if (jb_frame->plc) { - *flags |= SFF_PLC; - } - bytes = jb_frame->dlen + rtp_header_len; - rtp_session->recv_msg.header.ts = htonl(jb_frame->ts); - rtp_session->recv_msg.header.pt = rtp_session->payload; - } else { - goto timer_check; + goto recvfrom; } }