diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c index 969fc2d628..bdcce1f475 100644 --- a/res/res_rtp_asterisk.c +++ b/res/res_rtp_asterisk.c @@ -103,8 +103,13 @@ #define TURN_STATE_WAIT_TIME 2000 -#define DEFAULT_RTP_SEND_BUFFER_SIZE 250 -#define DEFAULT_RTP_RECV_BUFFER_SIZE 20 +#define DEFAULT_RTP_SEND_BUFFER_SIZE 250 /*!< The initial size of the RTP send buffer */ +#define MAXIMUM_RTP_SEND_BUFFER_SIZE (DEFAULT_RTP_SEND_BUFFER_SIZE + 200) /*!< Maximum RTP send buffer size */ +#define DEFAULT_RTP_RECV_BUFFER_SIZE 20 /*!< The initial size of the RTP receiver buffer */ +#define MAXIMUM_RTP_RECV_BUFFER_SIZE (DEFAULT_RTP_RECV_BUFFER_SIZE + 20) /*!< Maximum RTP receive buffer size */ +#define OLD_PACKET_COUNT 1000 /*!< The number of previous packets that are considered old */ +#define MISSING_SEQNOS_ADDED_TRIGGER 2 /*!< The number of immediate missing packets that will trigger an immediate NACK */ +#define SEQNO_CYCLE_OVER 65536 /*!< The number after the maximum allowed sequence number */ /*! Full INTRA-frame Request / Fast Update Request (From RFC2032) */ #define RTCP_PT_FUR 192 @@ -4587,10 +4592,10 @@ static int ast_rtcp_generate_nack(struct ast_rtp_instance *instance, unsigned ch { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); int packet_len; - int blp_index; + int blp_index = -1; int current_seqno; - int seqno; - unsigned int fci; + unsigned int fci = 0; + size_t remaining_missing_seqno; if (!rtp || !rtp->rtcp) { return 0; @@ -4601,32 +4606,50 @@ static int ast_rtcp_generate_nack(struct ast_rtp_instance *instance, unsigned ch } current_seqno = rtp->expectedrxseqno; - seqno = rtp->lastrxseqno; + remaining_missing_seqno = AST_VECTOR_SIZE(&rtp->missing_seqno); packet_len = 12; /* The header length is 12 (version line, packet source SSRC, media source SSRC) */ - /* Get the missing sequence numbers for the FCI section of the NACK request */ - for (blp_index = 0, fci = 0; current_seqno < seqno; current_seqno++, blp_index++) { + /* If there are no missing sequence numbers then don't bother sending a NACK needlessly */ + if (!remaining_missing_seqno) { + return 0; + } + + /* This iterates through the possible forward sequence numbers seeing which ones we + * have no packet for, adding it to the NACK until we are out of missing packets. + */ + while (remaining_missing_seqno) { int *missing_seqno; + /* On the first entry to this loop blp_index will be -1, so this will become 0 + * and the sequence number will be placed into the packet as the PID. + */ + blp_index++; + missing_seqno = AST_VECTOR_GET_CMP(&rtp->missing_seqno, current_seqno, find_by_value); + if (missing_seqno) { + /* We hit the max blp size, reset */ + if (blp_index >= 17) { + put_unaligned_uint32(rtcpheader + packet_len, htonl(fci)); + fci = 0; + blp_index = 0; + packet_len += 4; + } - if (!missing_seqno) { - continue; + if (blp_index == 0) { + fci |= (current_seqno << 16); + } else { + fci |= (1 << (blp_index - 1)); + } + + /* Since we've used a missing sequence number, we're down one */ + remaining_missing_seqno--; } - /* We hit the max blp size, reset */ - if (blp_index >= 17) { - put_unaligned_uint32(rtcpheader + packet_len, htonl(fci)); - fci = 0; - blp_index = 0; - packet_len += 4; - } - - if (blp_index == 0) { - fci |= (current_seqno << 16); - } else { - fci |= (1 << (blp_index - 1)); + /* Handle cycling of the sequence number */ + current_seqno++; + if (current_seqno == SEQNO_CYCLE_OVER) { + current_seqno = 0; } } @@ -5793,6 +5816,7 @@ static int ast_rtp_rtcp_handle_nack(struct ast_rtp_instance *instance, unsigned int abs_send_time_id; unsigned int now_msw = 0; unsigned int now_lsw = 0; + unsigned int packets_not_found = 0; if (!rtp->send_buffer) { ast_debug(1, "Tried to handle NACK request, but we don't have a RTP packet storage!\n"); @@ -5825,6 +5849,7 @@ static int ast_rtp_rtcp_handle_nack(struct ast_rtp_instance *instance, unsigned res += rtp_sendto(instance, payload->buf, payload->size, 0, &remote_address, &ice); } else { ast_debug(1, "Received NACK request for RTP packet with seqno %d, but we don't have it\n", pid); + packets_not_found++; } /* * The bitmask. Denoting the least significant bit as 1 and its most significant bit @@ -5846,6 +5871,7 @@ static int ast_rtp_rtcp_handle_nack(struct ast_rtp_instance *instance, unsigned res += rtp_sendto(instance, payload->buf, payload->size, 0, &remote_address, &ice); } else { ast_debug(1, "Remote end also requested RTP packet with seqno %d, but we don't have it\n", seqno); + packets_not_found++; } } blp >>= 1; @@ -5853,6 +5879,15 @@ static int ast_rtp_rtcp_handle_nack(struct ast_rtp_instance *instance, unsigned } } + if (packets_not_found) { + /* Grow the send buffer based on how many packets were not found in the buffer, but + * enforce a maximum. + */ + ast_data_buffer_resize(rtp->send_buffer, MIN(MAXIMUM_RTP_SEND_BUFFER_SIZE, + ast_data_buffer_max(rtp->send_buffer) + packets_not_found)); + ast_debug(2, "Send buffer on RTP instance '%p' is now at maximum of %zu\n", instance, ast_data_buffer_max(rtp->send_buffer)); + } + return res; } @@ -7625,6 +7660,11 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc } else if (rtp->expectedrxseqno == -1 || seqno == rtp->expectedrxseqno) { rtp->expectedrxseqno = seqno + 1; + /* We've cycled over, so go back to 0 */ + if (rtp->expectedrxseqno == SEQNO_CYCLE_OVER) { + rtp->expectedrxseqno = 0; + } + /* If there are no buffered packets that will be placed after this frame then we can * return it directly without duplicating it. */ @@ -7643,7 +7683,7 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc /* If we don't have the next packet after this we can directly return the frame, as there is no * chance it will be overwritten. */ - if (!ast_data_buffer_get(rtp->recv_buffer, seqno + 1)) { + if (!ast_data_buffer_get(rtp->recv_buffer, rtp->expectedrxseqno)) { frame = ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); AST_LIST_INSERT_TAIL(&frames, frame, frame_list); return AST_LIST_FIRST(&frames); @@ -7654,9 +7694,10 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc * to the head of the frames list and avoid having to duplicate it but this would result in out * of order packet processing by libsrtp which we are trying to avoid. */ - frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, seqno - 1)); + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno)); if (frame) { AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + prev_seqno = seqno; } /* Add any additional packets that we have buffered and that are available */ @@ -7668,7 +7709,7 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc break; } - frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, rtp->expectedrxseqno - 1)); + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, prev_seqno)); ast_free(payload); if (!frame) { @@ -7681,11 +7722,15 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc ast_debug(2, "Pulled buffered packet with sequence number '%d' to additionally return on RTP instance '%p'\n", frame->seqno, instance); AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + prev_seqno = rtp->expectedrxseqno; rtp->expectedrxseqno++; + if (rtp->expectedrxseqno == SEQNO_CYCLE_OVER) { + rtp->expectedrxseqno = 0; + } } return AST_LIST_FIRST(&frames); - } else if ((abs(seqno - rtp->expectedrxseqno) > 100) || + } else if (((abs(seqno - rtp->expectedrxseqno) > 100) && timestamp > rtp->lastividtimestamp) || ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer)) { int inserted = 0; @@ -7701,43 +7746,46 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc rtp_write_rtcp_fir(instance, rtp, &remote_address); } + /* This works by going through the progression of the sequence number retrieving buffered packets + * or inserting the current received packet until we've run out of packets. This ensures that the + * packets are in the correct sequence number order. + */ while (ast_data_buffer_count(rtp->recv_buffer)) { struct ast_rtp_rtcp_nack_payload *payload; - payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_remove_head(rtp->recv_buffer); - if (!payload) { + /* If the packet we received is the one we are expecting at this point then add it in */ + if (rtp->expectedrxseqno == seqno) { + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + prev_seqno = seqno; + ast_debug(2, "Inserted just received packet with sequence number '%d' in correct order on RTP instance '%p'\n", + seqno, instance); + } + rtp->expectedrxseqno++; + if (rtp->expectedrxseqno == SEQNO_CYCLE_OVER) { + rtp->expectedrxseqno = 0; + } + inserted = 1; continue; } - /* Even when dumping the receive buffer we do our best to order things, so we ensure that the - * packet we just received is processed in the correct order, so see if we need to insert it now. - */ - if (!inserted) { - int buffer_seqno; - - buffer_seqno = ntohl(payload->buf[0]) & 0xffff; - if (seqno < buffer_seqno) { - frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno)); - if (frame) { - AST_LIST_INSERT_TAIL(&frames, frame, frame_list); - rtp->expectedrxseqno = seqno + 1; - prev_seqno = seqno; - ast_debug(2, "Inserted just received packet with sequence number '%d' in correct order on RTP instance '%p'\n", - seqno, instance); - } - inserted = 1; + payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_remove(rtp->recv_buffer, rtp->expectedrxseqno); + if (payload) { + frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, prev_seqno)); + if (frame) { + AST_LIST_INSERT_TAIL(&frames, frame, frame_list); + prev_seqno = rtp->expectedrxseqno; + ast_debug(2, "Emptying queue and returning packet with sequence number '%d' from RTP instance '%p'\n", + frame->seqno, instance); } + ast_free(payload); } - frame = ast_frdup(ast_rtp_interpret(instance, srtp, &addr, payload->buf, payload->size, prev_seqno)); - if (frame) { - AST_LIST_INSERT_TAIL(&frames, frame, frame_list); - prev_seqno = frame->seqno; - ast_debug(2, "Emptying queue and returning packet with sequence number '%d' from RTP instance '%p'\n", - frame->seqno, instance); + rtp->expectedrxseqno++; + if (rtp->expectedrxseqno == SEQNO_CYCLE_OVER) { + rtp->expectedrxseqno = 0; } - - ast_free(payload); } if (!inserted) { @@ -7749,11 +7797,21 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc frame = ast_rtp_interpret(instance, srtp, &addr, read_area, res, prev_seqno); AST_LIST_INSERT_TAIL(&frames, frame, frame_list); rtp->expectedrxseqno = seqno + 1; + if (rtp->expectedrxseqno == SEQNO_CYCLE_OVER) { + rtp->expectedrxseqno = 0; + } ast_debug(2, "Adding just received packet with sequence number '%d' to end of dumped queue on RTP instance '%p'\n", seqno, instance); } + /* When we flush increase our chance for next time by growing the receive buffer when possible + * by how many packets we missed, to give ourselves a bit more breathing room. + */ + ast_data_buffer_resize(rtp->recv_buffer, MIN(MAXIMUM_RTP_RECV_BUFFER_SIZE, + ast_data_buffer_max(rtp->recv_buffer) + AST_VECTOR_SIZE(&rtp->missing_seqno))); + ast_debug(2, "Receive buffer on RTP instance '%p' is now at maximum of %zu\n", instance, ast_data_buffer_max(rtp->recv_buffer)); + /* As there is such a large gap we don't want to flood the order side with missing packets, so we * give up and start anew. */ @@ -7765,7 +7823,18 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc /* We're finished with the frames list */ ast_frame_free(AST_LIST_FIRST(&frames), 0); - if (seqno < rtp->expectedrxseqno) { + /* Determine if the received packet is from the last OLD_PACKET_COUNT (1000 by default) packets or not. + * For the case where the received sequence number exceeds that of the expected sequence number we calculate + * the past sequence number that would be 1000 sequence numbers ago. If the received sequence number + * exceeds or meets that then it is within OLD_PACKET_COUNT packets ago. For example if the expected + * sequence number is 100 and we receive 65530, then it would be considered old. This is because + * 65535 - 1000 + 100 = 64635 which gives us the sequence number at which we would consider the packets + * old. Since 65530 is above that, it would be considered old. + * For the case where the received sequence number is less than the expected sequence number we can do + * a simple subtraction to see if it is 1000 packets ago or not. + */ + if ((seqno < rtp->expectedrxseqno && ((rtp->expectedrxseqno - seqno) <= OLD_PACKET_COUNT)) || + (seqno > rtp->expectedrxseqno && (seqno >= (65535 - OLD_PACKET_COUNT + rtp->expectedrxseqno)))) { /* If this is a packet from the past then we have received a duplicate packet, so just drop it */ ast_debug(2, "Received an old packet with sequence number '%d' on RTP instance '%p', dropping it\n", seqno, instance); @@ -7778,10 +7847,12 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc } else { /* This is an out of order packet from the future */ struct ast_rtp_rtcp_nack_payload *payload; - int difference; + int missing_seqno; + int remove_failed; + unsigned int missing_seqnos_added = 0; - ast_debug(2, "Received an out of order packet with sequence number '%d' from the future on RTP instance '%p'\n", - seqno, instance); + ast_debug(2, "Received an out of order packet with sequence number '%d' while expecting '%d' from the future on RTP instance '%p'\n", + seqno, rtp->expectedrxseqno, instance); payload = ast_malloc(sizeof(*payload) + res); if (!payload) { @@ -7797,11 +7868,38 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc payload->size = res; memcpy(payload->buf, rtpheader, res); ast_data_buffer_put(rtp->recv_buffer, seqno, payload); - AST_VECTOR_REMOVE_CMP_ORDERED(&rtp->missing_seqno, seqno, find_by_value, - AST_VECTOR_ELEM_CLEANUP_NOOP); - difference = seqno - (prev_seqno + 1); - while (difference > 0) { + /* If this sequence number is removed that means we had a gap and this packet has filled it in + * some. Since it was part of the gap we will have already added any other missing sequence numbers + * before it (and possibly after it) to the vector so we don't need to do that again. Note that + * remove_failed will be set to -1 if the sequence number isn't removed, and 0 if it is. + */ + remove_failed = AST_VECTOR_REMOVE_CMP_ORDERED(&rtp->missing_seqno, seqno, find_by_value, + AST_VECTOR_ELEM_CLEANUP_NOOP); + if (!remove_failed) { + ast_debug(2, "Packet with sequence number '%d' on RTP instance '%p' is no longer missing\n", + seqno, instance); + } + + /* The missing sequence number code works by taking the sequence number of the + * packet we've just received and going backwards until we hit the sequence number + * of the last packet we've received. While doing so we check to make sure that the + * sequence number is not already missing and that it is not already buffered. + */ + missing_seqno = seqno; + while (remove_failed) { + missing_seqno -= 1; + + /* If we've cycled backwards then start back at the top */ + if (missing_seqno < 0) { + missing_seqno = 65535; + } + + /* We've gone backwards enough such that we've hit the previous sequence number */ + if (missing_seqno == prev_seqno) { + break; + } + /* We don't want missing sequence number duplicates. If, for some reason, * packets are really out of order, we could end up in this scenario: * @@ -7814,24 +7912,34 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc * * This will prevent the duplicate from being added. */ - if (AST_VECTOR_GET_CMP(&rtp->missing_seqno, seqno - difference, + if (AST_VECTOR_GET_CMP(&rtp->missing_seqno, missing_seqno, find_by_value)) { - difference--; + continue; + } + + /* If this packet has been buffered already then don't count it amongst the + * missing. + */ + if (ast_data_buffer_get(rtp->recv_buffer, missing_seqno)) { continue; } ast_debug(2, "Added missing sequence number '%d' to RTP instance '%p'\n", - seqno - difference, instance); - AST_VECTOR_ADD_SORTED(&rtp->missing_seqno, seqno - difference, + missing_seqno, instance); + AST_VECTOR_ADD_SORTED(&rtp->missing_seqno, missing_seqno, compare_by_value); - difference--; + missing_seqnos_added++; } - /* When our data buffer is half full we assume that the packets aren't just out of order but - * have actually been lost. To get them back we construct and send a NACK causing the sender to - * retransmit them. + /* When we add a large number of missing sequence numbers we assume there was a substantial + * gap in reception so we trigger an immediate NACK. When our data buffer is 1/4 full we + * assume that the packets aren't just out of order but have actually been lost. At 1/2 + * full we get more aggressive and ask for retransmission when we get a new packet. + * To get them back we construct and send a NACK causing the sender to retransmit them. */ - if (ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer) / 2) { + if (missing_seqnos_added >= MISSING_SEQNOS_ADDED_TRIGGER || + ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer) / 4 || + ast_data_buffer_count(rtp->recv_buffer) >= ast_data_buffer_max(rtp->recv_buffer) / 2) { int packet_len = 0; int res = 0; int ice;