change blocking rtp to psuedo-blocking to avoid endlessly blocking reads and refactor jitter buffer
git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@12846 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
parent
aa2f34e98a
commit
c82440e246
|
@ -791,7 +791,13 @@ SWITCH_DECLARE(switch_status_t) switch_pollset_add(switch_pollset_t *pollset, co
|
||||||
|
|
||||||
SWITCH_DECLARE(switch_status_t) switch_poll(switch_pollfd_t *aprset, int32_t numsock, int32_t *nsds, switch_interval_time_t timeout)
|
SWITCH_DECLARE(switch_status_t) switch_poll(switch_pollfd_t *aprset, int32_t numsock, int32_t *nsds, switch_interval_time_t timeout)
|
||||||
{
|
{
|
||||||
return apr_poll((apr_pollfd_t *)aprset, numsock, nsds, timeout);
|
apr_status_t st = apr_poll((apr_pollfd_t *)aprset, numsock, nsds, timeout);
|
||||||
|
|
||||||
|
if (st == APR_TIMEUP) {
|
||||||
|
st = SWITCH_STATUS_TIMEOUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
return st;
|
||||||
}
|
}
|
||||||
|
|
||||||
SWITCH_DECLARE(switch_status_t) switch_socket_create_pollfd(switch_pollfd_t **poll, switch_socket_t *sock, int16_t flags, switch_memory_pool_t *pool)
|
SWITCH_DECLARE(switch_status_t) switch_socket_create_pollfd(switch_pollfd_t **poll, switch_socket_t *sock, int16_t flags, switch_memory_pool_t *pool)
|
||||||
|
|
|
@ -129,7 +129,9 @@ struct switch_rtp {
|
||||||
* used.
|
* used.
|
||||||
*/
|
*/
|
||||||
switch_socket_t *sock_input, *sock_output;
|
switch_socket_t *sock_input, *sock_output;
|
||||||
|
switch_pollfd_t *read_pollfd;
|
||||||
|
switch_pollfd_t *jb_pollfd;
|
||||||
|
|
||||||
switch_sockaddr_t *local_addr;
|
switch_sockaddr_t *local_addr;
|
||||||
rtp_msg_t send_msg;
|
rtp_msg_t send_msg;
|
||||||
|
|
||||||
|
@ -188,7 +190,6 @@ struct switch_rtp {
|
||||||
switch_timer_t timer;
|
switch_timer_t timer;
|
||||||
uint8_t ready;
|
uint8_t ready;
|
||||||
uint8_t cn;
|
uint8_t cn;
|
||||||
switch_time_t last_time;
|
|
||||||
stfu_instance_t *jb;
|
stfu_instance_t *jb;
|
||||||
uint32_t max_missed_packets;
|
uint32_t max_missed_packets;
|
||||||
uint32_t missed_count;
|
uint32_t missed_count;
|
||||||
|
@ -598,6 +599,8 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s
|
||||||
if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK)) {
|
if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK)) {
|
||||||
switch_socket_opt_set(rtp_session->sock_input, SWITCH_SO_NONBLOCK, TRUE);
|
switch_socket_opt_set(rtp_session->sock_input, SWITCH_SO_NONBLOCK, TRUE);
|
||||||
switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_NOBLOCK);
|
switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_NOBLOCK);
|
||||||
|
} else {
|
||||||
|
switch_socket_create_pollfd(&rtp_session->read_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
status = SWITCH_STATUS_SUCCESS;
|
status = SWITCH_STATUS_SUCCESS;
|
||||||
|
@ -1013,7 +1016,14 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_stun_ping(switch_rtp_t *rtp_
|
||||||
|
|
||||||
SWITCH_DECLARE(switch_status_t) switch_rtp_activate_jitter_buffer(switch_rtp_t *rtp_session, uint32_t queue_frames)
|
SWITCH_DECLARE(switch_status_t) switch_rtp_activate_jitter_buffer(switch_rtp_t *rtp_session, uint32_t queue_frames)
|
||||||
{
|
{
|
||||||
|
if (rtp_session->read_pollfd) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't use jitterbuffer without a timer.\n");
|
||||||
|
return SWITCH_STATUS_FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
rtp_session->jb = stfu_n_init(queue_frames);
|
rtp_session->jb = stfu_n_init(queue_frames);
|
||||||
|
switch_socket_create_pollfd(&rtp_session->jb_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool);
|
||||||
|
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1364,11 +1374,15 @@ static void do_flush(switch_rtp_t *rtp_session)
|
||||||
static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_type, switch_frame_flag_t *flags, switch_io_flag_t io_flags)
|
static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_type, switch_frame_flag_t *flags, switch_io_flag_t io_flags)
|
||||||
{
|
{
|
||||||
switch_size_t bytes = 0;
|
switch_size_t bytes = 0;
|
||||||
switch_status_t status;
|
switch_status_t status = SWITCH_STATUS_SUCCESS, poll_status = SWITCH_STATUS_SUCCESS;
|
||||||
int check = 0;
|
int check = 0;
|
||||||
stfu_frame_t *jb_frame;
|
stfu_frame_t *jb_frame;
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
int sleep_mss = 1000;
|
int sleep_mss = 1000;
|
||||||
|
int poll_sec = 5;
|
||||||
|
int poll_loop = 0;
|
||||||
|
int fdr = 0;
|
||||||
|
int from_jb = 0;
|
||||||
|
|
||||||
if (!switch_rtp_ready(rtp_session)) {
|
if (!switch_rtp_ready(rtp_session)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1376,8 +1390,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
||||||
|
|
||||||
if (rtp_session->timer.interval) {
|
if (rtp_session->timer.interval) {
|
||||||
sleep_mss = rtp_session->timer.interval * 1000;
|
sleep_mss = rtp_session->timer.interval * 1000;
|
||||||
} else {
|
|
||||||
rtp_session->last_time = switch_time_now();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
READ_INC(rtp_session);
|
READ_INC(rtp_session);
|
||||||
|
@ -1386,13 +1398,54 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
||||||
int do_cng = 0;
|
int do_cng = 0;
|
||||||
|
|
||||||
if (rtp_session->timer.interval) {
|
if (rtp_session->timer.interval) {
|
||||||
switch_core_timer_next(&rtp_session->timer);
|
int do_sleep = 1;
|
||||||
|
if (rtp_session->jb) {
|
||||||
|
if (switch_poll(rtp_session->jb_pollfd, 1, &fdr, 1) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
do_sleep = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (do_sleep) {
|
||||||
|
switch_core_timer_next(&rtp_session->timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
recvfrom:
|
recvfrom:
|
||||||
|
|
||||||
bytes = sizeof(rtp_msg_t);
|
if (rtp_session->jb) {
|
||||||
status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes);
|
if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) {
|
||||||
|
memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen);
|
||||||
|
if (jb_frame->plc) {
|
||||||
|
*flags |= SFF_PLC;
|
||||||
|
}
|
||||||
|
bytes = jb_frame->dlen + rtp_header_len;
|
||||||
|
rtp_session->recv_msg.header.ts = htonl(jb_frame->ts);
|
||||||
|
rtp_session->recv_msg.header.pt = rtp_session->payload;
|
||||||
|
from_jb = 1;
|
||||||
|
goto post_read;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (rtp_session->read_pollfd) {
|
||||||
|
poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, poll_sec * 1000000);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (poll_status == SWITCH_STATUS_SUCCESS) {
|
||||||
|
bytes = sizeof(rtp_msg_t);
|
||||||
|
status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes);
|
||||||
|
} else {
|
||||||
|
if (!SWITCH_STATUS_IS_BREAK(poll_status) && poll_status != SWITCH_STATUS_TIMEOUT) {
|
||||||
|
ret = -1;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
poll_loop = 1;
|
||||||
|
rtp_session->missed_count += (poll_sec * 1000 ) / (rtp_session->ms_per_packet / 1000);
|
||||||
|
bytes = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
post_read:
|
||||||
|
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
ret = (int) bytes;
|
ret = (int) bytes;
|
||||||
|
@ -1427,6 +1480,10 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!bytes && poll_loop) {
|
||||||
|
goto recvfrom;
|
||||||
|
}
|
||||||
|
|
||||||
if (bytes && rtp_session->recv_msg.header.m && rtp_session->recv_msg.header.pt != rtp_session->te) {
|
if (bytes && rtp_session->recv_msg.header.m && rtp_session->recv_msg.header.pt != rtp_session->te) {
|
||||||
rtp_flush_read_buffer(rtp_session);
|
rtp_flush_read_buffer(rtp_session);
|
||||||
}
|
}
|
||||||
|
@ -1517,26 +1574,16 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rtp_session->jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) {
|
if (rtp_session->jb && !from_jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) {
|
||||||
if (bytes) {
|
if (bytes) {
|
||||||
|
|
||||||
if (rtp_session->recv_msg.header.m) {
|
if (rtp_session->recv_msg.header.m) {
|
||||||
stfu_n_reset(rtp_session->jb);
|
stfu_n_reset(rtp_session->jb);
|
||||||
}
|
}
|
||||||
|
|
||||||
stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len);
|
stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len);
|
||||||
bytes = 0;
|
bytes = 0;
|
||||||
}
|
goto recvfrom;
|
||||||
|
|
||||||
if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) {
|
|
||||||
memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen);
|
|
||||||
if (jb_frame->plc) {
|
|
||||||
*flags |= SFF_PLC;
|
|
||||||
}
|
|
||||||
bytes = jb_frame->dlen + rtp_header_len;
|
|
||||||
rtp_session->recv_msg.header.ts = htonl(jb_frame->ts);
|
|
||||||
rtp_session->recv_msg.header.pt = rtp_session->payload;
|
|
||||||
} else {
|
|
||||||
goto timer_check;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue