diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 67ff017887..968b2baccd 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -896,6 +896,7 @@ SFF_RAW_RTP = (1 << 1) - Frame has raw rtp accessible SFF_RTP_HEADER = (1 << 2) - Get the rtp header from the frame header SFF_PLC = (1 << 3) - Frame has generated PLC data SFF_RFC2833 = (1 << 4) - Frame has rfc2833 dtmf data +SFF_DYNAMIC = (1 << 5) - Frame is dynamic and should be freed */ typedef enum { @@ -905,7 +906,8 @@ typedef enum { SFF_RTP_HEADER = (1 << 2), SFF_PLC = (1 << 3), SFF_RFC2833 = (1 << 4), - SFF_PROXY_PACKET = (1 << 5) + SFF_PROXY_PACKET = (1 << 5), + SFF_DYNAMIC = (1 << 6) } switch_frame_flag_enum_t; typedef uint32_t switch_frame_flag_t; diff --git a/src/include/switch_utils.h b/src/include/switch_utils.h index 9ea7dcc693..8c1654ee2c 100644 --- a/src/include/switch_utils.h +++ b/src/include/switch_utils.h @@ -109,6 +109,10 @@ SWITCH_DECLARE(char *) switch_amp_encode(char *s, char *buf, switch_size_t len); SWITCH_DECLARE(switch_size_t) switch_fd_read_line(int fd, char *buf, switch_size_t len); +SWITCH_DECLARE(switch_status_t) switch_frame_alloc(switch_frame_t **frame, switch_size_t size); +SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_frame_t **clone); +SWITCH_DECLARE(switch_status_t) switch_frame_free(switch_frame_t **frame); + /*! \brief Evaluate the truthfullness of a string expression \param expr a string expression diff --git a/src/mod/endpoints/mod_loopback/mod_loopback.c b/src/mod/endpoints/mod_loopback/mod_loopback.c index a943a5460c..5985d2d875 100644 --- a/src/mod/endpoints/mod_loopback/mod_loopback.c +++ b/src/mod/endpoints/mod_loopback/mod_loopback.c @@ -69,7 +69,7 @@ struct private_object { unsigned char databuf[SWITCH_RECOMMENDED_BUFFER_SIZE]; switch_frame_t *x_write_frame; - switch_frame_t write_frame; + switch_frame_t *write_frame; unsigned char write_databuf[SWITCH_RECOMMENDED_BUFFER_SIZE]; switch_frame_t cng_frame; @@ -78,6 +78,7 @@ struct private_object { switch_caller_profile_t *caller_profile; int32_t bowout_frame_count; char *other_uuid; + switch_queue_t *frame_queue; }; typedef struct private_object private_t; @@ -157,10 +158,6 @@ static switch_status_t tech_init(private_t *tech_pvt, switch_core_session_t *ses tech_pvt->read_frame.data = tech_pvt->databuf; tech_pvt->read_frame.buflen = sizeof(tech_pvt->databuf); tech_pvt->read_frame.codec = &tech_pvt->read_codec; - - tech_pvt->write_frame.data = tech_pvt->write_databuf; - tech_pvt->write_frame.buflen = sizeof(tech_pvt->write_databuf); - tech_pvt->write_frame.codec = &tech_pvt->write_codec; tech_pvt->cng_frame.data = tech_pvt->cng_databuf; @@ -190,6 +187,7 @@ static switch_status_t tech_init(private_t *tech_pvt, switch_core_session_t *ses switch_mutex_init(&tech_pvt->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); switch_mutex_init(&tech_pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); switch_core_session_set_private(session, tech_pvt); + switch_queue_create(&tech_pvt->frame_queue, 50000, switch_core_session_get_pool(session)); tech_pvt->session = session; tech_pvt->channel = switch_core_session_get_channel(session); } @@ -348,6 +346,7 @@ static switch_status_t channel_on_destroy(switch_core_session_t *session) { switch_channel_t *channel = NULL; private_t *tech_pvt = NULL; + void *pop; channel = switch_core_session_get_channel(session); switch_assert(channel != NULL); @@ -364,6 +363,15 @@ static switch_status_t channel_on_destroy(switch_core_session_t *session) if (switch_core_codec_ready(&tech_pvt->write_codec)) { switch_core_codec_destroy(&tech_pvt->write_codec); } + + if (tech_pvt->write_frame) { + switch_frame_free(&tech_pvt->write_frame); + } + + while (switch_queue_trypop(tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + switch_frame_t *frame = (switch_frame_t *) pop; + switch_frame_free(&frame); + } } @@ -518,6 +526,7 @@ static switch_status_t channel_read_frame(switch_core_session_t *session, switch private_t *tech_pvt = NULL; switch_status_t status = SWITCH_STATUS_FALSE; switch_mutex_t *mutex = NULL; + void *pop = NULL; channel = switch_core_session_get_channel(session); switch_assert(channel != NULL); @@ -539,18 +548,19 @@ static switch_status_t channel_read_frame(switch_core_session_t *session, switch goto end; } - if (!switch_test_flag(tech_pvt, TFLAG_CNG) && !switch_test_flag(tech_pvt, TFLAG_WRITE)) { - switch_core_timer_next(&tech_pvt->timer); - } - if (switch_test_flag(tech_pvt, TFLAG_WRITE)) { - *frame = &tech_pvt->write_frame; - switch_clear_flag_locked(tech_pvt, TFLAG_WRITE); - switch_clear_flag_locked(tech_pvt, TFLAG_CNG); + switch_core_timer_next(&tech_pvt->timer); + if (switch_queue_trypop(tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + if (tech_pvt->write_frame) { + switch_frame_free(&tech_pvt->write_frame); + } + tech_pvt->write_frame = (switch_frame_t *) pop; + tech_pvt->write_frame->codec = &tech_pvt->read_codec; + *frame = tech_pvt->write_frame; } else { switch_set_flag(tech_pvt, TFLAG_CNG); } - + if (switch_test_flag(tech_pvt, TFLAG_CNG)) { *frame = &tech_pvt->cng_frame; tech_pvt->cng_frame.codec = &tech_pvt->read_codec; @@ -622,17 +632,18 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc } if (switch_test_flag(tech_pvt, TFLAG_LINKED)) { + switch_frame_t *clone; if (frame->codec->implementation != tech_pvt->write_codec.implementation) { /* change codecs to match */ tech_init(tech_pvt, session, frame->codec); tech_init(tech_pvt->other_tech_pvt, tech_pvt->other_session, frame->codec); } + + if (switch_frame_dup(frame, &clone) != SWITCH_STATUS_SUCCESS) { + abort(); + } - memcpy(&tech_pvt->other_tech_pvt->write_frame, frame, sizeof(*frame)); - tech_pvt->other_tech_pvt->write_frame.data = tech_pvt->other_tech_pvt->write_databuf; - tech_pvt->other_tech_pvt->write_frame.buflen = sizeof(tech_pvt->other_tech_pvt->write_databuf); - //tech_pvt->other_tech_pvt->write_frame.codec = &tech_pvt->other_tech_pvt->write_codec; - memcpy(tech_pvt->other_tech_pvt->write_frame.data, frame->data, frame->datalen); + switch_queue_push(tech_pvt->other_tech_pvt->frame_queue, clone); switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_WRITE); status = SWITCH_STATUS_SUCCESS; } diff --git a/src/switch_utils.c b/src/switch_utils.c index 7f3762a814..42f2e6e3c3 100644 --- a/src/switch_utils.c +++ b/src/switch_utils.c @@ -61,6 +61,56 @@ int switch_inet_pton(int af, const char *src, void *dst) } #endif + +SWITCH_DECLARE(switch_status_t) switch_frame_alloc(switch_frame_t **frame, switch_size_t size) +{ + switch_frame_t *new_frame; + + switch_zmalloc(new_frame, sizeof(*new_frame)); + + switch_set_flag(new_frame, SFF_DYNAMIC); + new_frame->buflen = size; + new_frame->data = malloc(size); + switch_assert(new_frame->data); + + *frame = new_frame; + + return SWITCH_STATUS_SUCCESS; +} + + +SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_frame_t **clone) +{ + switch_frame_t *new_frame; + + new_frame = malloc(sizeof(*new_frame)); + + *new_frame = *orig; + switch_set_flag(new_frame, SFF_DYNAMIC); + new_frame->data = malloc(new_frame->buflen); + switch_assert(new_frame->data); + + memcpy(new_frame->data, orig->data, orig->datalen); + new_frame->codec = NULL; + + *clone = new_frame; + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_DECLARE(switch_status_t) switch_frame_free(switch_frame_t **frame) +{ + if (!frame || !*frame || !switch_test_flag((*frame), SFF_DYNAMIC)) { + return SWITCH_STATUS_FALSE; + } + + free((*frame)->data); + free(*frame); + *frame = NULL; + + return SWITCH_STATUS_SUCCESS; +} + SWITCH_DECLARE(switch_status_t) switch_network_list_create(switch_network_list_t **list, switch_bool_t default_type, switch_memory_pool_t *pool) { switch_network_list_t *new_list;