From 33c6f7a22eaf8b73c6d59c7b133ca76b0a3f412b Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Thu, 5 Aug 2010 17:56:18 -0500 Subject: [PATCH] revert FSCORE-639 --- src/switch_rtp.c | 165 +++++++++++------------------------------------ 1 file changed, 37 insertions(+), 128 deletions(-) diff --git a/src/switch_rtp.c b/src/switch_rtp.c index efe7a2b8a3..0d4420eabe 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -135,17 +135,6 @@ struct switch_rtp_rfc2833_data { switch_mutex_t *dtmf_mutex; }; - -#define FLUSH_MAX 5 -#define MAX_MSG 6 - -struct rtp_packet { - rtp_msg_t recv_msg; - switch_size_t bytes; -}; - -typedef struct rtp_packet rtp_packet_t; - struct switch_rtp { /* * Two sockets are needed because we might be transcoding protocol families @@ -162,12 +151,7 @@ struct switch_rtp { rtcp_msg_t rtcp_send_msg; switch_sockaddr_t *remote_addr, *rtcp_remote_addr; - rtp_msg_t recv_msg; - rtp_packet_t recv_msg_array[MAX_MSG]; - int recv_msg_idx; - - rtcp_msg_t rtcp_recv_msg; switch_sockaddr_t *remote_stun_addr; @@ -241,7 +225,7 @@ struct switch_rtp { uint32_t cng_count; switch_rtp_bug_flag_t rtp_bugs; switch_rtp_stats_t stats; - //uint32_t hot_hits; + uint32_t hot_hits; uint32_t sync_packets; int rtcp_interval; switch_bool_t rtcp_fresh_frame; @@ -255,7 +239,6 @@ struct switch_rtp { #endif switch_time_t send_time; - //int more_data; }; struct switch_rtcp_senderinfo { @@ -2074,66 +2057,13 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t { switch_status_t status = SWITCH_STATUS_FALSE; stfu_frame_t *jb_frame; - int i = 0; - switch_assert(bytes); - *bytes = 0; - - top: - - if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH))) { - if (rtp_session->recv_msg_idx) { - rtp_session->recv_msg = rtp_session->recv_msg_array[0].recv_msg; - *bytes = rtp_session->recv_msg_array[0].bytes; - - for (i = 1; i < MAX_MSG - 1; i++) { - rtp_session->recv_msg_array[i-1] = rtp_session->recv_msg_array[i]; - } - rtp_session->recv_msg_idx--; - status = SWITCH_STATUS_SUCCESS; - goto got_data; - } - - - while(rtp_session->recv_msg_idx < MAX_MSG) { - switch_status_t rstatus; - switch_size_t rb = sizeof(rtp_msg_t); - - rstatus = switch_socket_recvfrom(rtp_session->from_addr, - rtp_session->sock_input, 0, - (void *) &rtp_session->recv_msg_array[rtp_session->recv_msg_idx].recv_msg, - &rb); - - if ((rstatus != SWITCH_STATUS_SUCCESS && rstatus != SWITCH_STATUS_BREAK) || rb < 0) { - if (rtp_session->recv_msg_idx) { - /* Handle the data we have queued up */ - break; - } else { - *bytes = rb; - return rstatus; - } - } - - if (!rb) break; - - rtp_session->recv_msg_array[rtp_session->recv_msg_idx].bytes = rb; - rtp_session->recv_msg_idx++; - - switch_cond_next(); /* Relax just a bit */ - } - - if (!*bytes && rtp_session->recv_msg_idx) goto top; - } else { - *bytes = sizeof(rtp_msg_t); - status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes); - } - - got_data: + *bytes = sizeof(rtp_msg_t); + status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes); if (*bytes) { - rtp_session->stats.inbound.raw_bytes += *bytes; if (rtp_session->recv_te && rtp_session->recv_msg.header.pt == rtp_session->recv_te) { rtp_session->stats.inbound.dtmf_packet_count++; @@ -2147,12 +2077,10 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t rtp_session->stats.inbound.packet_count++; } - if ((rtp_session->recv_te && rtp_session->recv_msg.header.pt == rtp_session->recv_te) || - *bytes < rtp_header_len || - switch_test_flag(rtp_session, SWITCH_RTP_FLAG_PROXY_MEDIA) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_UDPTL)) { + if (rtp_session->recv_te && rtp_session->recv_msg.header.pt == rtp_session->recv_te) { return SWITCH_STATUS_SUCCESS; } - + if (rtp_session->jb && rtp_session->recv_msg.header.version == 2 && *bytes) { if (rtp_session->recv_msg.header.m && rtp_session->recv_msg.header.pt != rtp_session->recv_te && @@ -2305,7 +2233,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ int fdr = 0; int rtcp_fdr = 0; int hot_socket = 0; - int read_loops = 0; if (session) { channel = switch_core_session_get_channel(session); @@ -2324,36 +2251,36 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ while (switch_rtp_ready(rtp_session)) { int do_cng = 0; bytes = 0; - read_loops++; if (rtp_session->timer.interval) { - if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH)) && - rtp_session->recv_msg_idx > FLUSH_MAX) { - hot_socket = 1; - } else { - hot_socket = 0; + if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH)) && + rtp_session->read_pollfd) { + if (switch_poll(rtp_session->read_pollfd, 1, &fdr, 0) == SWITCH_STATUS_SUCCESS) { + rtp_session->hot_hits += rtp_session->samples_per_interval; + + if (rtp_session->hot_hits >= rtp_session->samples_per_second * 5) { + switch_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH); + hot_socket = 1; + } + } else { + rtp_session->hot_hits = 0; + } } - + if (hot_socket) { rtp_session->sync_packets++; switch_core_timer_sync(&rtp_session->timer); } else { if (rtp_session->sync_packets) { #if 0 - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Auto-Flush catching up %d packets (%d)ms.\n", rtp_session->sync_packets, (rtp_session->ms_per_packet * rtp_session->sync_packets) / 1000); #endif rtp_session->sync_packets = 0; } - switch_core_timer_next(&rtp_session->timer); } - - - - - } recvfrom: @@ -2476,13 +2403,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ goto end; } - if (!bytes && (io_flags & SWITCH_IO_FLAG_NOBLOCK)) { - rtp_session->missed_count = 0; - ret = 0; - goto end; - } - - if (rtp_session->max_missed_packets && read_loops == 1) { + if (rtp_session->max_missed_packets) { if (bytes) { rtp_session->missed_count = 0; } else if (++rtp_session->missed_count >= rtp_session->max_missed_packets) { @@ -2611,7 +2532,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_UDPTL)) { *flags |= SFF_UDPTL_PACKET; } - + ret = (int) bytes; goto end; } @@ -2619,17 +2540,15 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ if (bytes) { rtp_session->missed_count = 0; - if (bytes < rtp_header_len) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid RTP packet size of %ld bytes.\n", (long)bytes); - bytes = 0; - goto do_continue; - } - if (rtp_session->recv_msg.header.pt && (rtp_session->recv_msg.header.pt == rtp_session->cng_pt || rtp_session->recv_msg.header.pt == 13)) { return_cng_frame(); } } + if (!bytes && (io_flags & SWITCH_IO_FLAG_NOBLOCK)) { + return_cng_frame(); + } + if (check && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTO_CNG) && rtp_session->timer.samplecount >= (rtp_session->last_write_samplecount + (rtp_session->samples_per_interval * 50))) { @@ -2748,7 +2667,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ we put up with as much as we can so we don't have to deal with being punished for doing it right. Nice guys finish last! */ - if (bytes > rtp_header_len && !switch_test_flag(rtp_session, SWITCH_RTP_FLAG_PROXY_MEDIA) && + if (bytes && !switch_test_flag(rtp_session, SWITCH_RTP_FLAG_PROXY_MEDIA) && !switch_test_flag(rtp_session, SWITCH_RTP_FLAG_PASS_RFC2833) && rtp_session->recv_msg.header.pt == rtp_session->recv_te) { switch_size_t len = bytes - rtp_header_len; unsigned char *packet = (unsigned char *) rtp_session->recv_msg.body; @@ -2887,16 +2806,17 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ switch_cond_next(); continue; } - + return_cng_frame(); } } - + if (status == SWITCH_STATUS_BREAK || bytes == 0) { - if (!(io_flags & SWITCH_IO_FLAG_SINGLE_READ) && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) { + if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) { goto do_continue; } - return_cng_frame(); + ret = 0; + goto end; } if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_GOOGLEHACK) && rtp_session->recv_msg.header.pt == 102) { @@ -2928,7 +2848,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ } end: - + READ_DEC(rtp_session); return ret; @@ -3032,7 +2952,6 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_read(switch_rtp_t *rtp_session, void bytes = rtp_common_read(rtp_session, payload_type, flags, io_flags); - if (bytes < 0) { *datalen = 0; return bytes == -2 ? SWITCH_STATUS_TIMEOUT : SWITCH_STATUS_GENERR; @@ -3040,9 +2959,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_read(switch_rtp_t *rtp_session, void *datalen = 0; return SWITCH_STATUS_BREAK; } else { - if (bytes > rtp_header_len) { - bytes -= rtp_header_len; - } + bytes -= rtp_header_len; } *datalen = bytes; @@ -3152,7 +3069,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_zerocopy_read_frame(switch_rtp_t *rtp if (bytes < 0) { frame->datalen = 0; return bytes == -2 ? SWITCH_STATUS_TIMEOUT : SWITCH_STATUS_GENERR; - } else if (bytes < rtp_header_len) { + } else if (bytes == 0) { frame->datalen = 0; return SWITCH_STATUS_BREAK; } else { @@ -3181,9 +3098,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_zerocopy_read(switch_rtp_t *rtp_sessi *datalen = 0; return SWITCH_STATUS_GENERR; } else { - if (bytes > rtp_header_len) { - bytes -= rtp_header_len; - } + bytes -= rtp_header_len; } *datalen = bytes; @@ -3211,9 +3126,7 @@ static int rtp_common_write(switch_rtp_t *rtp_session, send_msg->header.pt = rtp_session->te; } data = send_msg->body; - if (datalen > rtp_header_len) { - datalen -= rtp_header_len; - } + datalen -= rtp_header_len; } else { uint8_t m = 0; @@ -3226,14 +3139,10 @@ static int rtp_common_write(switch_rtp_t *rtp_session, if (timestamp) { rtp_session->ts = (uint32_t) timestamp; - /* Send marker bit if timestamp is lower/same as before (resetted/new timer) */ - if (rtp_session->ts <= rtp_session->last_write_ts) { - m++; - } } else if (rtp_session->timer.timer_interface) { rtp_session->ts = rtp_session->timer.samplecount; - if (rtp_session->ts <= rtp_session->last_write_ts && rtp_session->ts > 0) { + if (rtp_session->ts <= rtp_session->last_write_ts) { rtp_session->ts = rtp_session->last_write_ts + rtp_session->samples_per_interval; } } else {