From 71a902d258cbea2302faf75b3bfa058567c49200 Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Wed, 8 Dec 2010 09:09:14 -0500 Subject: [PATCH] freetdm: Added core rx and tx drops Added sig_write callback ftmod_r2 - Added IO stats flag during call setup - Disable user read and writes during call setup --- libs/freetdm/src/ftdm_io.c | 61 +++++++++++++-- libs/freetdm/src/ftmod/ftmod_r2/ftmod_r2.c | 75 ++++++++++++++++--- libs/freetdm/src/include/private/ftdm_core.h | 3 + libs/freetdm/src/include/private/ftdm_types.h | 3 + 4 files changed, 127 insertions(+), 15 deletions(-) diff --git a/libs/freetdm/src/ftdm_io.c b/libs/freetdm/src/ftdm_io.c index b7f8d7ee26..3206ee4313 100644 --- a/libs/freetdm/src/ftdm_io.c +++ b/libs/freetdm/src/ftdm_io.c @@ -2254,7 +2254,7 @@ static ftdm_status_t call_hangup(ftdm_channel_t *chan, const char *file, const c } else { /* the signaling stack did not touch the state, * core is responsible from clearing flags and stuff */ - ftdm_channel_done(chan); + ftdm_channel_close(&chan); } return FTDM_SUCCESS; } @@ -2557,8 +2557,14 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_done(ftdm_channel_t *ftdmchan) ftdm_span_send_signal(ftdmchan->span, &sigmsg); } + if (ftdmchan->txdrops || ftdmchan->rxdrops) { + ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "channel dropped data: txdrops = %d, rxdrops = %d\n", + ftdmchan->txdrops, ftdmchan->rxdrops); + } + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "channel done\n"); + ftdm_mutex_unlock(ftdmchan->mutex); return FTDM_SUCCESS; @@ -3385,6 +3391,16 @@ skipdebug: static FIO_WRITE_FUNCTION(ftdm_raw_write) { int dlen = (int) *datalen; + if (ftdm_test_flag(ftdmchan, FTDM_CHANNEL_TX_DISABLED)) { + ftdmchan->txdrops++; + if (ftdmchan->txdrops <= 10) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "cannot write in channel with tx disabled\n"); + } + if (ftdmchan->txdrops == 10) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "Too many tx drops, not printing anymore\n"); + } + return FTDM_FAIL; + } if (ftdmchan->fds[FTDM_WRITE_TRACE_INDEX] > -1) { if ((write(ftdmchan->fds[FTDM_WRITE_TRACE_INDEX], data, dlen)) != dlen) { ftdm_log(FTDM_LOG_WARNING, "Raw output trace failed to write all of the %zd bytes\n", dlen); @@ -3557,6 +3573,18 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_read(ftdm_channel_t *ftdmchan, void *data goto done; } + if (ftdm_test_flag(ftdmchan, FTDM_CHANNEL_RX_DISABLED)) { + ftdmchan->rxdrops++; + if (ftdmchan->rxdrops <= 10) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "cannot read from channel with rx disabled\n"); + } + if (ftdmchan->rxdrops == 10) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "too many rx drops, not logging anymore\n"); + } + status = FTDM_FAIL; + goto done; + } + status = ftdm_raw_read(ftdmchan, data, datalen); if (status != FTDM_SUCCESS) { @@ -3756,7 +3784,7 @@ done: FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *data, ftdm_size_t datasize, ftdm_size_t *datalen) { - ftdm_status_t status = FTDM_FAIL; + ftdm_status_t status = FTDM_SUCCESS; fio_codec_t codec_func = NULL; ftdm_size_t max = datasize; unsigned int i = 0; @@ -3764,22 +3792,28 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *dat ftdm_assert_return(ftdmchan != NULL, FTDM_FAIL, "null channel on write!\n"); ftdm_assert_return(ftdmchan->fio != NULL, FTDM_FAIL, "null I/O on write!\n"); + ftdm_channel_lock(ftdmchan); + if (!ftdmchan->buffer_delay && ((ftdmchan->dtmf_buffer && ftdm_buffer_inuse(ftdmchan->dtmf_buffer)) || (ftdmchan->fsk_buffer && ftdm_buffer_inuse(ftdmchan->fsk_buffer)))) { /* read size writing DTMF ATM */ - return FTDM_SUCCESS; + goto done; } if (!ftdm_test_flag(ftdmchan, FTDM_CHANNEL_OPEN)) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "cannot write in channel not open\n"); snprintf(ftdmchan->last_error, sizeof(ftdmchan->last_error), "channel not open"); - return FTDM_FAIL; + status = FTDM_FAIL; + goto done; } if (!ftdmchan->fio->write) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "write method not implemented\n"); snprintf(ftdmchan->last_error, sizeof(ftdmchan->last_error), "method not implemented"); - return FTDM_FAIL; + status = FTDM_FAIL; + goto done; } if (ftdm_test_flag(ftdmchan, FTDM_CHANNEL_TRANSCODE) && ftdmchan->effective_codec != ftdmchan->native_codec) { @@ -3796,10 +3830,13 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *dat if (codec_func) { status = codec_func(data, max, datalen); } else { + ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "Do not know how to handle transcoding from %d to %d\n", + ftdmchan->effective_codec, ftdmchan->native_codec); snprintf(ftdmchan->last_error, sizeof(ftdmchan->last_error), "codec error!"); status = FTDM_FAIL; + goto done; } - } + } if (ftdm_test_flag(ftdmchan, FTDM_CHANNEL_USE_TX_GAIN) && (ftdmchan->native_codec == FTDM_CODEC_ALAW || ftdmchan->native_codec == FTDM_CODEC_ULAW)) { @@ -3809,8 +3846,20 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *dat } } + if (ftdmchan->span->sig_write) { + status = ftdmchan->span->sig_write(ftdmchan, data, datalen); + if (status == FTDM_BREAK) { + /* signaling module decided to drop user frame */ + status = FTDM_SUCCESS; + goto done; + } + } + status = ftdm_raw_write(ftdmchan, data, datalen); +done: + ftdm_channel_unlock(ftdmchan); + return status; } diff --git a/libs/freetdm/src/ftmod/ftmod_r2/ftmod_r2.c b/libs/freetdm/src/ftmod/ftmod_r2/ftmod_r2.c index 30221e149d..21f7a1202c 100644 --- a/libs/freetdm/src/ftmod/ftmod_r2/ftmod_r2.c +++ b/libs/freetdm/src/ftmod/ftmod_r2/ftmod_r2.c @@ -73,7 +73,6 @@ typedef struct ftdm_r2_call_t { ftdm_size_t ani_index; char logname[255]; char name[10]; - unsigned long txdrops; } ftdm_r2_call_t; /* this is just used as place holder in the stack when configuring the span to avoid using bunch of locals */ @@ -129,6 +128,12 @@ typedef struct ftdm_r2_data_s { uint64_t total_loops; /* number of loops per 10ms increment from 0-9ms, 10-19ms .. 100ms and above */ uint64_t loops[11]; + /* Total number of sleeps performed so far */ + uint64_t total_sleeps; + /* number of sleeps per 10ms increment from 0-9ms, 10-19ms .. 100ms and above */ + uint64_t sleeps[11]; + /* max time spent in ms sleeping in a single loop */ + int32_t sleepmax; /* LWP */ uint32_t monitor_thread_id; /* Logging directory */ @@ -338,7 +343,6 @@ static void ft_r2_clean_call(ftdm_r2_call_t *call) call->dnis_index = 0; call->ani_index = 0; call->name[0] = 0; - call->txdrops = 0; } static void ft_r2_accept_call(ftdm_channel_t *ftdmchan) @@ -404,6 +408,10 @@ static FIO_CHANNEL_OUTGOING_CALL_FUNCTION(r2_outgoing_call) return FTDM_BREAK; } + ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_IO_STATS); + ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_TX_BUFFERS, NULL); + ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_RX_BUFFERS, NULL); + return FTDM_SUCCESS; } @@ -477,6 +485,9 @@ static void ftdm_r2_on_call_init(openr2_chan_t *r2chan) R2CALL(ftdmchan)->chanstate = FTDM_CHANNEL_STATE_DOWN; ftdm_set_state(ftdmchan, FTDM_CHANNEL_STATE_COLLECT); + ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_IO_STATS); + ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_TX_BUFFERS, NULL); + ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_RX_BUFFERS, NULL); } static void dump_mf(openr2_chan_t *r2chan); @@ -556,6 +567,11 @@ static void ftdm_r2_on_call_accepted(openr2_chan_t *r2chan, openr2_call_mode_t m /* at this point the MF signaling has ended and there is no point on keep reading */ openr2_chan_disable_read(r2chan); + + /* at this point we are no longer responsible for reading and writing, + * we are not interested in the stats anymore */ + ftdm_channel_clear_feature(ftdmchan, FTDM_CHANNEL_FEATURE_IO_STATS); + R2CALL(ftdmchan)->accepted = 1; @@ -1335,7 +1351,6 @@ static FIO_CONFIGURE_SPAN_SIGNALING_FUNCTION(ftdm_r2_configure_span_signaling) /* value and key are the same so just free one of them */ snprintf(r2call->name, sizeof(r2call->name), "chancall%d", i); hashtable_insert(spanpvt->r2calls, (void *)r2call->name, r2call, HASHTABLE_FLAG_FREE_VALUE); - } r2data->mf_dump_size = r2conf.mf_dump_size; r2data->flags = 0; @@ -1347,6 +1362,7 @@ static FIO_CONFIGURE_SPAN_SIGNALING_FUNCTION(ftdm_r2_configure_span_signaling) span->start = ftdm_r2_start; span->stop = ftdm_r2_stop; span->sig_read = NULL; + span->sig_write = NULL; /* let the core set the states, we just read them */ span->get_channel_sig_status = ftdm_r2_get_channel_sig_status; @@ -1544,10 +1560,7 @@ static int ftdm_r2_state_advance(ftdm_channel_t *ftdmchan) /* finished call for good */ case FTDM_CHANNEL_STATE_DOWN: { - ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "Call is down\n"); - if (R2CALL(ftdmchan)->txdrops) { - ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "dropped %d tx packets\n", R2CALL(ftdmchan)->txdrops); - } + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "R2 Call is down\n"); openr2_chan_disable_read(r2chan); ret = 1; } @@ -1587,8 +1600,9 @@ static void ftdm_r2_state_advance_all(ftdm_channel_t *ftdmchan) static void *ftdm_r2_run(ftdm_thread_t *me, void *obj) { - openr2_chan_t *r2chan; + openr2_chan_t *r2chan = NULL; ftdm_channel_t *ftdmchan = NULL; + ftdm_r2_call_t *call = NULL; ftdm_status_t status; ftdm_span_t *span = (ftdm_span_t *) obj; ftdm_r2_data_t *r2data = span->signal_data; @@ -1662,6 +1676,19 @@ static void *ftdm_r2_run(ftdm_thread_t *me, void *obj) continue; } + ms = ((start.tv_sec - end.tv_sec) * 1000) + + ((( 1000000 + start.tv_usec - end.tv_usec) / 1000) - 1000); + if (ms < 0) { + ms = 0; + } + if (ms > r2data->sleepmax) { + r2data->sleepmax = ms; + } + index = (ms / 15); + index = (index > 10) ? 10 : index; + r2data->sleeps[index]++; + r2data->total_sleeps++; + /* this main loop takes care of MF and CAS signaling during call setup and tear down * for every single channel in the span, do not perform blocking operations here! */ chaniter = ftdm_span_get_chan_iterator(span, chaniter); @@ -1670,13 +1697,26 @@ static void *ftdm_r2_run(ftdm_thread_t *me, void *obj) ftdm_mutex_lock(ftdmchan->mutex); + call = R2CALL(ftdmchan); + + /* This let knows the core and io signaling hooks know that + * read/writes come from us and should be allowed */ + ftdm_clear_flag(ftdmchan, FTDM_CHANNEL_RX_DISABLED); + ftdm_clear_flag(ftdmchan, FTDM_CHANNEL_TX_DISABLED); + ftdm_r2_state_advance_all(ftdmchan); - r2chan = R2CALL(ftdmchan)->r2chan; + r2chan = call->r2chan; openr2_chan_process_signaling(r2chan); ftdm_r2_state_advance_all(ftdmchan); + if (!call->accepted) { + /* if the call is not accepted we do not want users reading */ + ftdm_set_flag(ftdmchan, FTDM_CHANNEL_RX_DISABLED); + ftdm_set_flag(ftdmchan, FTDM_CHANNEL_TX_DISABLED); + } + ftdm_mutex_unlock(ftdmchan->mutex); } /* deliver the actual events to the user now without any channel locking */ @@ -1886,6 +1926,8 @@ static FIO_API_FUNCTION(ftdm_r2_api) stream->write_function(stream, "-ERR invalid span. No R2 signal data in span.\n"); goto done; } + stream->write_function(stream, "-- Working --\n"); + stream->write_function(stream, "Total loops: %llu\n", r2data->total_loops); range = 0; for (i = 0; i < ftdm_array_len(r2data->loops); i++) { pct = 100*(float)r2data->loops[i]/r2data->total_loops; @@ -1897,6 +1939,21 @@ static FIO_API_FUNCTION(ftdm_r2_api) range += 10; } stream->write_function(stream, "\n"); + + stream->write_function(stream, "-- Sleeping --\n"); + stream->write_function(stream, "Total sleeps: %llu\n", r2data->total_sleeps); + range = 0; + for (i = 0; i < ftdm_array_len(r2data->sleeps); i++) { + pct = 100*(float)r2data->sleeps[i]/r2data->total_sleeps; + if ((i + 1) == ftdm_array_len(r2data->sleeps)) { + stream->write_function(stream, ">= %dms: %llu - %.03lf%%\n", range, r2data->sleeps[i], pct); + } else { + stream->write_function(stream, "%d-%dms: %llu - %.03lf%%\n", range, range + 14, r2data->sleeps[i], pct); + } + range += 15; + } + stream->write_function(stream, "\n"); + stream->write_function(stream, "+OK.\n"); goto done; } else { diff --git a/libs/freetdm/src/include/private/ftdm_core.h b/libs/freetdm/src/include/private/ftdm_core.h index f1058d8e48..a1e1c02439 100644 --- a/libs/freetdm/src/include/private/ftdm_core.h +++ b/libs/freetdm/src/include/private/ftdm_core.h @@ -477,6 +477,8 @@ struct ftdm_channel { ftdm_dtmf_debug_t dtmfdbg; ftdm_io_dump_t rxdump; ftdm_io_dump_t txdump; + int32_t txdrops; + int32_t rxdrops; }; struct ftdm_span { @@ -509,6 +511,7 @@ struct ftdm_span { ftdm_span_start_t start; ftdm_span_stop_t stop; ftdm_channel_sig_read_t sig_read; + ftdm_channel_sig_write_t sig_write; /* Private I/O data per span. Do not touch unless you are an I/O module */ void *io_data; char *type; diff --git a/libs/freetdm/src/include/private/ftdm_types.h b/libs/freetdm/src/include/private/ftdm_types.h index db1428c962..9c6260cdef 100644 --- a/libs/freetdm/src/include/private/ftdm_types.h +++ b/libs/freetdm/src/include/private/ftdm_types.h @@ -264,6 +264,8 @@ typedef enum { FTDM_CHANNEL_IN_ALARM = (1 << 27), FTDM_CHANNEL_SIG_UP = (1 << 28), FTDM_CHANNEL_USER_HANGUP = (1 << 29), + FTDM_CHANNEL_RX_DISABLED = (1 << 30), + FTDM_CHANNEL_TX_DISABLED = (1 << 31), } ftdm_channel_flag_t; #if defined(__cplusplus) && defined(WIN32) // fix C2676 @@ -380,6 +382,7 @@ typedef struct ftdm_fsk_modulator ftdm_fsk_modulator_t; typedef ftdm_status_t (*ftdm_span_start_t)(ftdm_span_t *span); typedef ftdm_status_t (*ftdm_span_stop_t)(ftdm_span_t *span); typedef ftdm_status_t (*ftdm_channel_sig_read_t)(ftdm_channel_t *ftdmchan, void *data, ftdm_size_t size); +typedef ftdm_status_t (*ftdm_channel_sig_write_t)(ftdm_channel_t *ftdmchan, void *data, ftdm_size_t size); typedef enum { FTDM_ITERATOR_VARS = 1,