diff --git a/src/include/switch_types.h b/src/include/switch_types.h index f54ec37661..adabb8464c 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -1525,7 +1525,8 @@ typedef enum { SFF_RAW_RTP_PARSE_FRAME = (1 << 13), SFF_PICTURE_RESET = (1 << 14), SFF_SAME_IMAGE = (1 << 15), - SFF_USE_VIDEO_TIMESTAMP = (1 << 16) + SFF_USE_VIDEO_TIMESTAMP = (1 << 16), + SFF_ENCODED = (1 << 17) } switch_frame_flag_enum_t; typedef uint32_t switch_frame_flag_t; @@ -2525,6 +2526,9 @@ typedef struct switch_vb_s switch_vb_t; struct switch_img_txt_handle_s; typedef struct switch_img_txt_handle_s switch_img_txt_handle_t; +struct switch_frame_buffer_s; +typedef struct switch_frame_buffer_s switch_frame_buffer_t; + SWITCH_END_EXTERN_C #endif /* For Emacs: diff --git a/src/include/switch_utils.h b/src/include/switch_utils.h index 0618230056..3178d37f4d 100644 --- a/src/include/switch_utils.h +++ b/src/include/switch_utils.h @@ -1222,6 +1222,11 @@ SWITCH_DECLARE(void) switch_http_dump_request(switch_http_request_t *request); SWITCH_DECLARE(void) switch_http_parse_qs(switch_http_request_t *request, char *qs); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t *fb, switch_frame_t **frameP); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *fb, switch_frame_t *orig, switch_frame_t **clone); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP); + SWITCH_END_EXTERN_C #endif /* For Emacs: diff --git a/src/mod/applications/mod_conference/mod_conference.c b/src/mod/applications/mod_conference/mod_conference.c index 95394d318c..22a791e0a6 100644 --- a/src/mod/applications/mod_conference/mod_conference.c +++ b/src/mod/applications/mod_conference/mod_conference.c @@ -412,6 +412,7 @@ typedef struct mcu_canvas_s { switch_rgb_color_t bgcolor; switch_mutex_t *mutex; switch_timer_t timer; + switch_frame_buffer_t *fb; switch_memory_pool_t *pool; } mcu_canvas_t; @@ -622,6 +623,8 @@ struct conference_member { char *kicked_sound; switch_queue_t *dtmf_queue; switch_queue_t *video_queue; + switch_queue_t *mux_out_queue; + switch_thread_t *video_muxing_write_thread; switch_thread_t *input_thread; cJSON *json; cJSON *status_field; @@ -713,6 +716,7 @@ static switch_status_t conference_outcall_bg(conference_obj_t *conference, SWITCH_STANDARD_APP(conference_function); static void launch_conference_video_muxing_thread(conference_obj_t *conference); static void launch_conference_thread(conference_obj_t *conference); +static void launch_conference_video_muxing_write_thread(conference_member_t *member); static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, void *obj); static switch_status_t conference_local_play_file(conference_obj_t *conference, switch_core_session_t *session, char *path, uint32_t leadin, void *buf, uint32_t buflen); @@ -1475,6 +1479,7 @@ static void write_canvas_image_to_codec_group(conference_obj_t *conference, code frame = &write_frame; frame->img = codec_set->frame.img; frame->packet = codec_set->frame.packet; + frame->packetlen = codec_set->frame.packetlen; switch_clear_flag(frame, SFF_SAME_IMAGE); frame->m = 0; @@ -1491,7 +1496,7 @@ static void write_canvas_image_to_codec_group(conference_obj_t *conference, code } do { - + frame->data = ((unsigned char *)frame->packet) + 12; frame->datalen = SWITCH_DEFAULT_VIDEO_SIZE; @@ -1507,6 +1512,8 @@ static void write_canvas_image_to_codec_group(conference_obj_t *conference, code switch_mutex_lock(conference->member_mutex); for (imember = conference->members; imember; imember = imember->next) { + switch_frame_t *dupframe; + if (switch_test_flag(imember, MFLAG_NO_MINIMIZE_ENCODING)) { continue; } @@ -1523,8 +1530,16 @@ static void write_canvas_image_to_codec_group(conference_obj_t *conference, code if (need_refresh) { switch_core_session_request_video_refresh(imember->session); } + + //switch_core_session_write_encoded_video_frame(imember->session, frame, 0, 0); + switch_set_flag(frame, SFF_ENCODED); + + if (switch_frame_buffer_dup(conference->canvas->fb, frame, &dupframe) == SWITCH_STATUS_SUCCESS) { + switch_queue_push(imember->mux_out_queue, dupframe); + dupframe = NULL; + } - switch_core_session_write_encoded_video_frame(imember->session, frame, 0, 0); + switch_clear_flag(frame, SFF_ENCODED); switch_core_session_rwunlock(imember->session); } @@ -1568,6 +1583,39 @@ static void vmute_snap(conference_member_t *member, switch_bool_t clear) } } +static void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_t *thread, void *obj) +{ + conference_member_t *member = (conference_member_t *) obj; + void *pop; + + while(switch_test_flag(member, MFLAG_RUNNING) || switch_queue_size(member->mux_out_queue)) { + switch_frame_t *frame; + + if (switch_test_flag(member, MFLAG_RUNNING)) { + if (switch_queue_pop(member->mux_out_queue, &pop) == SWITCH_STATUS_SUCCESS) { + if (!pop) continue; + + frame = (switch_frame_t *) pop; + if (switch_test_flag(frame, SFF_ENCODED)) { + switch_core_session_write_encoded_video_frame(member->session, frame, 0, 0); + } else { + switch_core_session_write_video_frame(member->session, frame, SWITCH_IO_FLAG_NONE, 0); + } + switch_frame_buffer_free(member->conference->canvas->fb, &frame); + } + } else { + if (switch_queue_trypop(member->mux_out_queue, &pop) == SWITCH_STATUS_SUCCESS) { + if (pop) { + frame = (switch_frame_t *) pop; + switch_frame_buffer_free(member->conference->canvas->fb, &frame); + } + } + } + } + + return NULL; +} + static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread_t *thread, void *obj) { conference_obj_t *conference = (conference_obj_t *) obj; @@ -1575,7 +1623,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread video_layout_t *vlayout = NULL; switch_codec_t *check_codec = NULL; codec_set_t *write_codecs[MAX_MUX_CODECS] = { 0 }; - int buflen = SWITCH_RECOMMENDED_BUFFER_SIZE * 2; + int buflen = SWITCH_RTP_MAX_BUF_LEN; int i = 0; int used = 0; uint32_t video_key_freq = 10000000; @@ -1587,6 +1635,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread switch_image_t *write_img = NULL, *file_img = NULL; uint32_t timestamp = 0; + if (conference->video_layout_group) { lg = switch_core_hash_find(conference->layout_group_hash, conference->video_layout_group); vlayout = find_best_layout(conference, lg); @@ -1602,10 +1651,11 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread } init_canvas(conference, vlayout); + switch_frame_buffer_create(&conference->canvas->fb); conference->video_timer_reset = 1; - packet = switch_core_alloc(conference->pool, SWITCH_RECOMMENDED_BUFFER_SIZE); + packet = switch_core_alloc(conference->pool, SWITCH_RTP_MAX_BUF_LEN); while (globals.running && !switch_test_flag(conference, CFLAG_DESTRUCT) && switch_test_flag(conference, CFLAG_VIDEO_MUXING)) { switch_bool_t need_refresh = SWITCH_FALSE, need_keyframe = SWITCH_FALSE, need_reset = SWITCH_FALSE; @@ -1685,7 +1735,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread write_codecs[i]->frame.packet = switch_core_alloc(conference->pool, buflen); write_codecs[i]->frame.data = ((uint8_t *)write_codecs[i]->frame.packet) + 12; - write_codecs[i]->frame.packetlen = 0; + write_codecs[i]->frame.packetlen = buflen; write_codecs[i]->frame.buflen = buflen - 12; switch_set_flag((&write_codecs[i]->frame), SFF_RAW_RTP); @@ -1877,7 +1927,8 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread switch_mutex_lock(conference->member_mutex); for (imember = conference->members; imember; imember = imember->next) { - + switch_frame_t *dupframe; + if (switch_test_flag(conference, CFLAG_MINIMIZE_VIDEO_ENCODING) && !switch_test_flag(imember, MFLAG_NO_MINIMIZE_ENCODING)) { continue; } @@ -1898,12 +1949,17 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread switch_set_flag(&write_frame, SFF_RAW_RTP); write_frame.img = write_img; write_frame.packet = packet; - write_frame.data = packet + 12; - write_frame.datalen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12; - write_frame.buflen = write_frame.datalen; - write_frame.packetlen = SWITCH_RECOMMENDED_BUFFER_SIZE; + write_frame.data = ((uint8_t *)packet) + 12; + write_frame.datalen = 0; + write_frame.buflen = SWITCH_RTP_MAX_BUF_LEN - 12; + write_frame.packetlen = 0; + + //switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0); - switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0); + if (switch_frame_buffer_dup(conference->canvas->fb, &write_frame, &dupframe) == SWITCH_STATUS_SUCCESS) { + switch_queue_push(imember->mux_out_queue, dupframe); + dupframe = NULL; + } if (imember->session) { switch_core_session_rwunlock(imember->session); @@ -1939,7 +1995,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread } switch_core_timer_destroy(&conference->canvas->timer); - + switch_frame_buffer_destroy(&conference->canvas->fb); destroy_canvas(&conference->canvas); return NULL; @@ -3663,9 +3719,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe member->video_codec_index = -1; switch_queue_create(&member->dtmf_queue, 100, member->pool); - if (conference->video_layout_name) { - switch_queue_create(&member->video_queue, 2000, member->pool); - } + conference->members = member; switch_set_flag_locked(member, MFLAG_INTREE); switch_mutex_unlock(conference->member_mutex); @@ -6498,6 +6552,10 @@ static void conference_loop_output(conference_member_t *member) if (member->input_thread) { switch_thread_join(&st, member->input_thread); } + if (member->video_muxing_write_thread) { + switch_queue_push(member->mux_out_queue, NULL); + switch_thread_join(&st, member->video_muxing_write_thread); + } } switch_core_timer_destroy(&timer); @@ -11584,6 +11642,12 @@ SWITCH_STANDARD_APP(conference_function) conference->min = 2; } + if (conference->video_layout_name) { + switch_queue_create(&member.video_queue, 2000, member.pool); + switch_queue_create(&member.mux_out_queue, 2000, member.pool); + launch_conference_video_muxing_write_thread(&member); + } + /* Add the caller to the conference */ if (conference_add_member(conference, &member) != SWITCH_STATUS_SUCCESS) { switch_core_codec_destroy(&member.read_codec); @@ -11703,7 +11767,18 @@ SWITCH_STANDARD_APP(conference_function) } -/* Create a thread for the conference and launch it */ + +static void launch_conference_video_muxing_write_thread(conference_member_t *member) +{ + switch_threadattr_t *thd_attr = NULL; + switch_mutex_lock(globals.hash_mutex); + if (!member->video_muxing_write_thread) { + switch_threadattr_create(&thd_attr, member->pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&member->video_muxing_write_thread, thd_attr, conference_video_muxing_write_thread_run, member, member->pool); + } + switch_mutex_unlock(globals.hash_mutex); +} static void launch_conference_video_muxing_thread(conference_obj_t *conference) { switch_threadattr_t *thd_attr = NULL; diff --git a/src/mod/applications/mod_fsv/mod_fsv.c b/src/mod/applications/mod_fsv/mod_fsv.c index 9a1e794b65..6791f2a95e 100644 --- a/src/mod/applications/mod_fsv/mod_fsv.c +++ b/src/mod/applications/mod_fsv/mod_fsv.c @@ -323,7 +323,7 @@ SWITCH_STANDARD_APP(play_fsv_function) vid_frame.codec = &vid_codec; vid_frame.packet = vid_buffer; - vid_frame.data = vid_buffer + 12; + vid_frame.data = ((uint8_t *)vid_buffer) + 12; vid_frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12; switch_set_flag((&vid_frame), SFF_RAW_RTP); // switch_set_flag((&vid_frame), SFF_PROXY_PACKET); @@ -397,7 +397,7 @@ SWITCH_STANDARD_APP(play_fsv_function) vid_frame.m = hdr->m; vid_frame.timestamp = ts; - vid_frame.data = data + 12; + vid_frame.data = ((uint8_t *)data) + 12; vid_frame.datalen = vid_frame.packetlen - 12; switch_core_session_write_video_frame(session, &vid_frame, SWITCH_IO_FLAG_NONE, 0); } @@ -577,7 +577,7 @@ SWITCH_STANDARD_APP(play_yuv_function) vid_frame.codec = codec; vid_frame.packet = vid_buffer; - vid_frame.data = vid_buffer + 12; + vid_frame.data = ((uint8_t *)vid_buffer) + 12; vid_frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12; switch_set_flag((&vid_frame), SFF_RAW_RTP); // switch_set_flag((&vid_frame), SFF_PROXY_PACKET); diff --git a/src/mod/formats/mod_vlc/mod_vlc.c b/src/mod/formats/mod_vlc/mod_vlc.c index cc95c39121..ffcc5f69a4 100644 --- a/src/mod/formats/mod_vlc/mod_vlc.c +++ b/src/mod/formats/mod_vlc/mod_vlc.c @@ -450,7 +450,7 @@ static void vlc_video_display_callback(void *data, void *id) } else { context->vid_frame->img = context->img; context->vid_frame->packet = context->video_packet; - context->vid_frame->data = context->video_packet + 12; + context->vid_frame->data = ((uint8_t *)context->video_packet) + 12; switch_core_session_write_video_frame(context->session, context->vid_frame, SWITCH_IO_FLAG_NONE, 0); } @@ -1304,7 +1304,7 @@ SWITCH_STANDARD_APP(play_video_function) audio_frame.codec = &codec; video_frame.codec = read_vid_codec; video_frame.packet = context->video_packet; - video_frame.data = context->video_packet + 12; + video_frame.data = ((uint8_t *)context->video_packet) + 12; switch_channel_set_variable(channel, SWITCH_PLAYBACK_TERMINATOR_USED, ""); @@ -1994,7 +1994,7 @@ static switch_status_t setup_tech_pvt(switch_core_session_t *osession, switch_co context->aud_frame = &tech_pvt->read_frame; context->vid_frame = &tech_pvt->read_video_frame; context->vid_frame->packet = context->video_packet; - context->vid_frame->data = context->video_packet + 12; + context->vid_frame->data = ((uint8_t *)context->video_packet) + 12; context->playing = 0; // context->err = 0; diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 90e1c3c348..a4b6b144b9 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -7145,7 +7145,8 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra uint32_t len, ts = 0; switch_payload_t payload = 0; rtp_msg_t *send_msg = NULL; - rtp_msg_t local_send_msg = { {0} }; + srtp_hdr_t local_header; + int r = 0; if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr) { return -1; @@ -7300,8 +7301,7 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra if (fwd) { send_msg = frame->packet; - local_send_msg = *send_msg; - send_msg = &local_send_msg; + local_header = send_msg->header; len = frame->packetlen; ts = 0; @@ -7330,7 +7330,14 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra } */ - return rtp_common_write(rtp_session, send_msg, data, len, payload, ts, &frame->flags); + r = rtp_common_write(rtp_session, send_msg, data, len, payload, ts, &frame->flags); + + if (send_msg) { + send_msg->header = local_header; + } + + return r; + } SWITCH_DECLARE(switch_rtp_stats_t *) switch_rtp_get_stats(switch_rtp_t *rtp_session, switch_memory_pool_t *pool) diff --git a/src/switch_utils.c b/src/switch_utils.c index bff05aeeb0..d270081b20 100644 --- a/src/switch_utils.c +++ b/src/switch_utils.c @@ -93,6 +93,143 @@ SWITCH_DECLARE(switch_status_t) switch_frame_alloc(switch_frame_t **frame, switc } +typedef struct switch_frame_node_s { + switch_frame_t *frame; + int inuse; + struct switch_frame_node_s *next; +} switch_frame_node_t; + +struct switch_frame_buffer_s { + switch_frame_node_t *head; + switch_memory_pool_t *pool; + switch_mutex_t *mutex; +}; + +static switch_frame_t *find_free_frame(switch_frame_buffer_t *fb, switch_frame_t *orig) +{ + switch_frame_node_t *np; + + switch_mutex_lock(fb->mutex); + for (np = fb->head; np; np = np->next) { + if (!np->inuse && ((orig->packet && np->frame->packet) || (!orig->packet && !np->frame->packet))) { + break; + } + } + + if (!np) { + np = switch_core_alloc(fb->pool, sizeof(*np)); + np->frame = switch_core_alloc(fb->pool, sizeof(*np->frame)); + + if (orig->packet) { + np->frame->packet = switch_core_alloc(fb->pool, SWITCH_RTP_MAX_BUF_LEN); + } else { + np->frame->data = switch_core_alloc(fb->pool, SWITCH_RTP_MAX_BUF_LEN); + np->frame->buflen = SWITCH_RTP_MAX_BUF_LEN; + } + np->next = fb->head; + fb->head = np; + } + + + np->frame->samples = orig->samples; + np->frame->rate = orig->rate; + np->frame->channels = orig->channels; + np->frame->payload = orig->payload; + np->frame->timestamp = orig->timestamp; + np->frame->seq = orig->seq; + np->frame->ssrc = orig->ssrc; + np->frame->m = orig->m; + np->frame->flags = orig->flags; + np->frame->codec = NULL; + np->frame->pmap = NULL; + np->frame->img = NULL; + np->frame->extra_data = np; + np->inuse = 1; + + switch_set_flag(np->frame, SFF_DYNAMIC); + + if (orig->packet) { + memcpy(np->frame->packet, orig->packet, orig->packetlen); + np->frame->packetlen = orig->packetlen; + np->frame->data = ((unsigned char *)np->frame->packet) + 12; + np->frame->datalen = orig->datalen; + } else { + np->frame->packetlen = 0; + memcpy(np->frame->data, orig->data, orig->datalen); + np->frame->datalen = orig->datalen; + } + + if (orig->img && !switch_test_flag(orig, SFF_ENCODED)) { + switch_img_copy(orig->img, &np->frame->img); + } + + switch_mutex_unlock(fb->mutex); + + return np->frame; +} + +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t *fb, switch_frame_t **frameP) +{ + switch_frame_t *old_frame; + switch_frame_node_t *node; + + switch_mutex_lock(fb->mutex); + + old_frame = *frameP; + *frameP = NULL; + + node = (switch_frame_node_t *) old_frame->extra_data; + node->inuse = 0; + switch_img_free(&node->frame->img); + + switch_mutex_unlock(fb->mutex); + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *fb, switch_frame_t *orig, switch_frame_t **clone) +{ + switch_frame_t *new_frame; + + if (!orig) { + return SWITCH_STATUS_FALSE; + } + + switch_assert(orig->buflen); + + new_frame = find_free_frame(fb, orig); + + *clone = new_frame; + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP) +{ + switch_frame_buffer_t *fb = *fbP; + switch_memory_pool_t *pool; + *fbP = NULL; + pool = fb->pool; + switch_core_destroy_memory_pool(&pool); + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP) +{ + switch_frame_buffer_t *fb; + switch_memory_pool_t *pool; + + switch_core_new_memory_pool(&pool); + fb = switch_core_alloc(pool, sizeof(*fb)); + fb->pool = pool; + switch_mutex_init(&fb->mutex, SWITCH_MUTEX_NESTED, pool); + *fbP = fb; + + return SWITCH_STATUS_SUCCESS; +} + + SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_frame_t **clone) { switch_frame_t *new_frame; @@ -104,18 +241,28 @@ SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_fr switch_assert(orig->buflen); new_frame = malloc(sizeof(*new_frame)); - switch_assert(new_frame); *new_frame = *orig; switch_set_flag(new_frame, SFF_DYNAMIC); - new_frame->data = malloc(new_frame->buflen); - switch_assert(new_frame->data); + if (orig->packet) { + new_frame->packet = malloc(SWITCH_RTP_MAX_BUF_LEN); + memcpy(new_frame->packet, orig->packet, orig->packetlen); + new_frame->data = ((unsigned char *)new_frame->packet) + 12; + } else { + new_frame->data = malloc(new_frame->buflen); + switch_assert(new_frame->data); + memcpy(new_frame->data, orig->data, orig->datalen); + } + - memcpy(new_frame->data, orig->data, orig->datalen); new_frame->codec = NULL; new_frame->pmap = NULL; + new_frame->img = NULL; + if (orig->img && !switch_test_flag(orig, SFF_ENCODED)) { + switch_img_copy(orig->img, &new_frame->img); + } *clone = new_frame; return SWITCH_STATUS_SUCCESS; @@ -127,7 +274,17 @@ SWITCH_DECLARE(switch_status_t) switch_frame_free(switch_frame_t **frame) return SWITCH_STATUS_FALSE; } - switch_safe_free((*frame)->data); + if ((*frame)->img) { + switch_img_free(&(*frame)->img); + } + + if ((*frame)->packet) { + free((*frame)->packet); + (*frame)->packet = NULL; + } else { + switch_safe_free((*frame)->data); + } + free(*frame); *frame = NULL;