From 70964c3a2c137f608cf0e756377c8471f7d5d82b Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Tue, 12 Oct 2010 09:31:05 -0400 Subject: [PATCH] mod_sangoma_codec: add some debug code --- .../mod_sangoma_codec/mod_sangoma_codec.c | 214 ++++++++++++++++-- 1 file changed, 194 insertions(+), 20 deletions(-) diff --git a/src/mod/codecs/mod_sangoma_codec/mod_sangoma_codec.c b/src/mod/codecs/mod_sangoma_codec/mod_sangoma_codec.c index c84dac4509..60ac3ef0be 100644 --- a/src/mod/codecs/mod_sangoma_codec/mod_sangoma_codec.c +++ b/src/mod/codecs/mod_sangoma_codec/mod_sangoma_codec.c @@ -108,6 +108,12 @@ static char g_codec_register_list[1024] = "all"; /* default codec list to NOT load, users may override */ static char g_codec_noregister_list[1024] = ""; +#define SANGOMA_RTP_QUEUE_SIZE 4 +struct sangoma_rtp_payload { + uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; + int32_t datalen; +}; + struct codec_data { /* sngtc request and reply */ sngtc_codec_request_t request; @@ -132,6 +138,13 @@ struct codec_data { switch_time_t avgrxus; switch_time_t last_rx_time; + /* RTP queue. The bigger the queue, the bigger the possible delay */ + struct sangoma_rtp_payload rtp_queue[SANGOMA_RTP_QUEUE_SIZE]; + uint8_t queue_windex; + uint8_t queue_rindex; + uint8_t queue_size; + uint8_t queue_max_ever; + unsigned debug_timing:1; }; struct sangoma_transcoding_session { @@ -352,6 +365,12 @@ static switch_status_t switch_sangoma_init_ilbc(switch_codec_t *codec, switch_co return switch_sangoma_init(codec, flags, codec_settings); } +#define SAFE_INDEX_INC(array, index) \ + (index)++; \ + if ((index) == switch_arraylen((array))) { \ + (index) = 0; \ + } + static switch_status_t switch_sangoma_encode(switch_codec_t *codec, switch_codec_t *other_codec, /* codec that was used by the other side */ void *decoded_data, /* decoded data that we must encode */ uint32_t decoded_data_len /* decoded data length */ , @@ -367,12 +386,17 @@ static switch_status_t switch_sangoma_encode(switch_codec_t *codec, switch_codec switch_frame_t encoded_frame; switch_status_t sres; switch_time_t now_time, difftime; + switch_time_t prevread_time, afterread_time; unsigned char ebuf_ulaw[decoded_data_len / 2]; short *dbuf_linear; int i = 0; int res = 0; struct sangoma_transcoding_session *sess = codec->private_info; + if (sess->encoder.debug_timing) { + prevread_time = switch_micro_time_now(); + } + /* start assuming we will not encode anything */ *encoded_data_len = 0; @@ -392,6 +416,16 @@ static switch_status_t switch_sangoma_encode(switch_codec_t *codec, switch_codec switch_mutex_unlock(g_sessions_lock); } + if (sess->encoder.debug_timing) { + now_time = switch_micro_time_now(); + if (sess->encoder.last_rx_time) { + difftime = now_time - sess->encoder.last_rx_time; + if (difftime > 25000 || difftime < 15000) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%ldus since last read on encoding session %lu.\n", (long)difftime, sess->sessid); + } + } + } + /* transcode to ulaw first */ dbuf_linear = decoded_data; @@ -422,12 +456,22 @@ static switch_status_t switch_sangoma_encode(switch_codec_t *codec, switch_codec /* do the reading */ memset(&encoded_frame, 0, sizeof(encoded_frame)); for ( ; ; ) { +#if 0 + prevread_time = switch_micro_time_now(); +#endif sres = switch_rtp_zerocopy_read_frame(sess->encoder.rxrtp, &encoded_frame, SWITCH_IO_FLAG_NOBLOCK); if (sres == SWITCH_STATUS_GENERR) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to read on Sangoma encoder RTP session: %d\n", sres); return SWITCH_STATUS_FALSE; } +#if 0 + afterread_time = switch_micro_time_now(); + difftime = afterread_time - prevread_time; + if (difftime > 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%ldus to read on encoding session %lu.\n", (long)difftime, sess->sessid); + } +#endif if (0 == encoded_frame.datalen) { break; } @@ -439,19 +483,32 @@ static switch_status_t switch_sangoma_encode(switch_codec_t *codec, switch_codec break; } - if (*encoded_data_len) { - sess->encoder.rxdiscarded++; + if (sess->encoder.queue_windex == sess->encoder.queue_rindex) { + if (sess->encoder.rtp_queue[sess->encoder.queue_rindex].datalen) { + /* if there is something where we want to write, we're dropping it */ + sess->encoder.rxdiscarded++; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Discarding encoded frame of %d bytes from RTP session %lu, windex = %d, rindex = %d\n", + sess->encoder.rtp_queue[sess->encoder.queue_rindex].datalen, sess->sessid, sess->encoder.queue_windex, sess->encoder.queue_rindex); + SAFE_INDEX_INC(sess->encoder.rtp_queue, sess->encoder.queue_rindex); + sess->encoder.queue_size--; + } } - memcpy(encoded_data, encoded_frame.data, encoded_frame.datalen); - *encoded_data_len = encoded_frame.datalen; + memcpy(sess->encoder.rtp_queue[sess->encoder.queue_windex].data, encoded_frame.data, encoded_frame.datalen); + sess->encoder.rtp_queue[sess->encoder.queue_windex].datalen = encoded_frame.datalen; + SAFE_INDEX_INC(sess->encoder.rtp_queue, sess->encoder.queue_windex); + + /* monitor the queue size */ + sess->encoder.queue_size++; + if (sess->encoder.queue_size > sess->encoder.queue_max_ever) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Encoder Rx queue for RTP session %lu is now %d, windex = %d, rindex = %d\n", sess->sessid, sess->encoder.queue_size, + sess->encoder.queue_windex, sess->encoder.queue_rindex); + sess->encoder.queue_max_ever = sess->encoder.queue_size; + } } - /* update encoding stats if we received a frame */ - if (*encoded_data_len) { - if (*encoded_data_len != codec->implementation->encoded_bytes_per_packet) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Returning odd encoded frame of %d bytes intead of %d bytes\n", *encoded_data_len, codec->implementation->encoded_bytes_per_packet); - } + /* update encoding stats if we have a frame to give */ + if (sess->encoder.rtp_queue[sess->encoder.queue_rindex].datalen) { sess->encoder.rx++; now_time = switch_micro_time_now(); if (!sess->encoder.last_rx_time) { @@ -469,10 +526,27 @@ static switch_status_t switch_sangoma_encode(switch_codec_t *codec, switch_codec } } sess->encoder.lastrxseqno = encoded_frame.seq; + + /* pop the data from the queue */ + *encoded_data_len = sess->encoder.rtp_queue[sess->encoder.queue_rindex].datalen; + memcpy(encoded_data, sess->encoder.rtp_queue[sess->encoder.queue_rindex].data, *encoded_data_len); + sess->encoder.rtp_queue[sess->encoder.queue_rindex].datalen = 0; + SAFE_INDEX_INC(sess->encoder.rtp_queue, sess->encoder.queue_rindex); + sess->encoder.queue_size--; + if (*encoded_data_len != codec->implementation->encoded_bytes_per_packet) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Returning odd encoded frame of %d bytes intead of %d bytes\n", *encoded_data_len, codec->implementation->encoded_bytes_per_packet); + } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No output from sangoma encoder\n"); } + if (sess->encoder.debug_timing) { + afterread_time = switch_micro_time_now(); + difftime = afterread_time - prevread_time; + if (difftime > 5000) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%ldus to execute encoding function in session %lu.\n", (long)difftime, sess->sessid); + } + } return SWITCH_STATUS_SUCCESS; } @@ -492,11 +566,16 @@ static switch_status_t switch_sangoma_decode(switch_codec_t *codec, /* codec ses switch_frame_t ulaw_frame; switch_status_t sres; switch_time_t now_time, difftime; + switch_time_t prevread_time, afterread_time; short *dbuf_linear; int i = 0; int res = 0; struct sangoma_transcoding_session *sess = codec->private_info; + if (sess->decoder.debug_timing) { + prevread_time = switch_micro_time_now(); + } + dbuf_linear = decoded_data; /* start assuming we will not decode anything */ @@ -524,6 +603,16 @@ static switch_status_t switch_sangoma_decode(switch_codec_t *codec, /* codec ses switch_mutex_unlock(g_sessions_lock); } + if (sess->decoder.debug_timing) { + now_time = switch_micro_time_now(); + if (sess->decoder.last_rx_time) { + difftime = now_time - sess->decoder.last_rx_time; + if (difftime > 25000 || difftime < 15000) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%ldms since last read on decoding session %lu.\n", (long)difftime, sess->sessid); + } + } + } + /* do the writing */ memset(&encoded_frame, 0, sizeof(encoded_frame)); encoded_frame.source = __FUNCTION__; @@ -548,12 +637,22 @@ static switch_status_t switch_sangoma_decode(switch_codec_t *codec, /* codec ses /* do the reading */ memset(&ulaw_frame, 0, sizeof(ulaw_frame)); for ( ; ; ) { +#if 0 + prevread_time = switch_micro_time_now(); +#endif sres = switch_rtp_zerocopy_read_frame(sess->decoder.rxrtp, &ulaw_frame, SWITCH_IO_FLAG_NOBLOCK); if (sres == SWITCH_STATUS_GENERR) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to read on Sangoma decoder RTP session: %d\n", sres); return SWITCH_STATUS_FALSE; } +#if 0 + afterread_time = switch_micro_time_now(); + difftime = afterread_time - prevread_time; + if (difftime > 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%ldus to read on decoding session %lu.\n", (long)difftime, sess->sessid); + } +#endif if (0 == ulaw_frame.datalen) { break; } @@ -565,21 +664,31 @@ static switch_status_t switch_sangoma_decode(switch_codec_t *codec, /* codec ses break; } - if (*decoded_data_len) { - sess->decoder.rxdiscarded++; + if (sess->decoder.queue_windex == sess->decoder.queue_rindex) { + if (sess->decoder.rtp_queue[sess->decoder.queue_rindex].datalen) { + /* if there is something where we want to write, we're dropping it */ + sess->decoder.rxdiscarded++; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Discarding decoded frame of %d bytes from RTP session %lu, windex = %d, rindex = %d\n", + sess->decoder.rtp_queue[sess->decoder.queue_rindex].datalen, sess->sessid, sess->decoder.queue_windex, sess->decoder.queue_rindex); + SAFE_INDEX_INC(sess->decoder.rtp_queue, sess->decoder.queue_rindex); + sess->decoder.queue_size--; + } } - /* transcode to linear */ - for (i = 0; i < ulaw_frame.datalen; i++) { - dbuf_linear[i] = ulaw_to_linear(((char *)ulaw_frame.data)[i]); + memcpy(sess->decoder.rtp_queue[sess->decoder.queue_windex].data, ulaw_frame.data, ulaw_frame.datalen); + sess->decoder.rtp_queue[sess->decoder.queue_windex].datalen = ulaw_frame.datalen; + SAFE_INDEX_INC(sess->decoder.rtp_queue, sess->decoder.queue_windex); + + /* monitor the queue size */ + sess->decoder.queue_size++; + if (sess->decoder.queue_size > sess->decoder.queue_max_ever) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Encoder Rx queue for RTP session %lu is now %d, windex = %d, rindex = %d\n", sess->sessid, sess->decoder.queue_size, + sess->decoder.queue_windex, sess->decoder.queue_rindex); + sess->decoder.queue_max_ever = sess->decoder.queue_size; } - *decoded_data_len = i * 2; } - if (*decoded_data_len) { - if (*decoded_data_len != codec->implementation->decoded_bytes_per_packet) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Returning odd decoded frame of %d bytes intead of %d bytes\n", *decoded_data_len, codec->implementation->decoded_bytes_per_packet); - } + if (sess->decoder.rtp_queue[sess->decoder.queue_rindex].datalen) { /* update decoding stats */ sess->decoder.rx++; @@ -599,12 +708,33 @@ static switch_status_t switch_sangoma_decode(switch_codec_t *codec, /* codec ses } } sess->decoder.lastrxseqno = ulaw_frame.seq; + + /* pop the data from the queue and transcode it */ + for (i = 0; i < sess->decoder.rtp_queue[sess->decoder.queue_rindex].datalen; i++) { + dbuf_linear[i] = ulaw_to_linear(((char *)sess->decoder.rtp_queue[sess->decoder.queue_rindex].data)[i]); + } + *decoded_data_len = i * 2; + sess->decoder.rtp_queue[sess->decoder.queue_rindex].datalen = 0; + SAFE_INDEX_INC(sess->decoder.rtp_queue, sess->decoder.queue_rindex); + sess->decoder.queue_size--; + + if (*decoded_data_len != codec->implementation->decoded_bytes_per_packet) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Returning odd decoded frame of %d bytes intead of %d bytes\n", *decoded_data_len, codec->implementation->decoded_bytes_per_packet); + } } else { *decoded_data_len = codec->implementation->decoded_bytes_per_packet; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No output from sangoma decoder, returning silent frame of %d bytes\n", *decoded_data_len); memset(dbuf_linear, 0, *decoded_data_len); } + if (sess->decoder.debug_timing) { + afterread_time = switch_micro_time_now(); + difftime = afterread_time - prevread_time; + if (difftime > 5000) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%ldus to execute decoding function in session %lu.\n", (long)difftime, sess->sessid); + } + } + return SWITCH_STATUS_SUCCESS; } @@ -652,7 +782,7 @@ static void sangoma_print_stats(switch_stream_handle_t *stream, switch_rtp_numbe stream->write_function(stream, "Flush packet count: %lu\n\n\n", stats->flush_packet_count); } -#define SANGOMA_SYNTAX "settings|sessions|stats " +#define SANGOMA_SYNTAX "settings|sessions|stats |debug |nodebug " SWITCH_STANDARD_API(sangoma_function) { char *argv[10] = { 0 }; @@ -788,6 +918,48 @@ SWITCH_STANDARD_API(sangoma_function) } else { stream->write_function(stream, "\n=== No Decoder ===\n\n"); } + } else if (!strcasecmp(argv[0], "debug")) { + struct sangoma_transcoding_session *sess; + unsigned long sessid = 0; + int ret = 0; + if (argc < 2) { + stream->write_function(stream, "%s", SANGOMA_SYNTAX); + goto done; + } + ret = sscanf(argv[1], "%lu", &sessid); + if (ret != 1) { + stream->write_function(stream, "%s", SANGOMA_SYNTAX); + goto done; + } + sess = sangoma_find_session(sessid); + if (!sess) { + stream->write_function(stream, "Failed to find session %lu\n", sessid); + goto done; + } + sess->encoder.debug_timing = 1; + sess->decoder.debug_timing = 1; + stream->write_function(stream, "Debug enabled for transcoding session: %lu\n", sessid); + } else if (!strcasecmp(argv[0], "nodebug")) { + struct sangoma_transcoding_session *sess; + unsigned long sessid = 0; + int ret = 0; + if (argc < 2) { + stream->write_function(stream, "%s", SANGOMA_SYNTAX); + goto done; + } + ret = sscanf(argv[1], "%lu", &sessid); + if (ret != 1) { + stream->write_function(stream, "%s", SANGOMA_SYNTAX); + goto done; + } + sess = sangoma_find_session(sessid); + if (!sess) { + stream->write_function(stream, "Failed to find session %lu\n", sessid); + goto done; + } + sess->encoder.debug_timing = 0; + sess->decoder.debug_timing = 0; + stream->write_function(stream, "Debug disabled for transcoding session: %lu\n", sessid); } else { stream->write_function(stream, "Unknown Command [%s]\n", argv[0]); } @@ -1103,6 +1275,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sangoma_codec_load) switch_console_set_complete("add sangoma_codec settings"); switch_console_set_complete("add sangoma_codec sessions"); switch_console_set_complete("add sangoma_codec stats"); + switch_console_set_complete("add sangoma_codec debug"); + switch_console_set_complete("add sangoma_codec nodebug"); return SWITCH_STATUS_SUCCESS; }