diff --git a/src/mod/applications/mod_conference/conference_video.c b/src/mod/applications/mod_conference/conference_video.c index 6c4e061aae..6185a2d724 100644 --- a/src/mod/applications/mod_conference/conference_video.c +++ b/src/mod/applications/mod_conference/conference_video.c @@ -1579,6 +1579,7 @@ void conference_video_launch_muxing_write_thread(conference_member_t *member) } switch_mutex_unlock(conference_globals.hash_mutex); } + void conference_video_launch_muxing_thread(conference_obj_t *conference, mcu_canvas_t *canvas, int super) { switch_threadattr_t *thd_attr = NULL; @@ -1595,12 +1596,96 @@ void conference_video_launch_muxing_thread(conference_obj_t *conference, mcu_can switch_mutex_unlock(conference_globals.hash_mutex); } +void *SWITCH_THREAD_FUNC conference_video_layer_thread_run(switch_thread_t *thread, void *obj) +{ + conference_member_t *member = (conference_member_t *) obj; + + if (switch_thread_rwlock_tryrdlock(member->rwlock) != SWITCH_STATUS_SUCCESS) { + return NULL; + } + + //switch_core_autobind_cpu(); + member->layer_thread_running = 1; + + switch_mutex_lock(member->layer_cond_mutex); + + while(conference_utils_member_test_flag(member, MFLAG_RUNNING) && member->layer_thread_running) { + mcu_layer_t *layer = NULL; + mcu_canvas_t *canvas = NULL; + + switch_thread_cond_wait(member->layer_cond, member->layer_cond_mutex); + + if (!conference_utils_member_test_flag(member, MFLAG_RUNNING)) { + break; + } + + switch_mutex_lock(member->conference->canvas_mutex); + if (member->video_layer_id > -1 && member->canvas_id > -1) { + canvas = member->conference->canvases[member->canvas_id]; + layer = &canvas->layers[member->video_layer_id]; + + if (layer->need_patch && switch_thread_rwlock_tryrdlock(canvas->video_rwlock) == SWITCH_STATUS_SUCCESS) { + if (layer->need_patch) { + conference_video_scale_and_patch(layer, NULL, SWITCH_FALSE); + layer->need_patch = 0; + } + switch_thread_rwlock_unlock(canvas->video_rwlock); + } + } + switch_mutex_unlock(member->conference->canvas_mutex); + } + + switch_mutex_unlock(member->layer_cond_mutex); + + member->layer_thread_running = 0; + + switch_thread_rwlock_unlock(member->rwlock); + + return NULL; +} + +void conference_video_wake_layer_thread(conference_member_t *member) +{ + if (member->layer_cond) { + if (switch_mutex_trylock(member->layer_cond_mutex) == SWITCH_STATUS_SUCCESS) { + switch_thread_cond_signal(member->layer_cond); + switch_mutex_unlock(member->layer_cond_mutex); + } + } +} + +void conference_video_launch_layer_thread(conference_member_t *member) +{ + switch_threadattr_t *thd_attr = NULL; + + if (switch_core_cpu_count() < 3) { + return; + } + + if (!member->layer_cond) { + switch_thread_cond_create(&member->layer_cond, member->pool); + switch_mutex_init(&member->layer_cond_mutex, SWITCH_MUTEX_NESTED, member->pool); + } + + switch_mutex_lock(conference_globals.hash_mutex); + if (!member->video_layer_thread) { + switch_threadattr_create(&thd_attr, member->pool); + //switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&member->video_layer_thread, thd_attr, conference_video_layer_thread_run, member, member->pool); + } + switch_mutex_unlock(conference_globals.hash_mutex); +} + + + + void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_t *thread, void *obj) { conference_member_t *member = (conference_member_t *) obj; void *pop; switch_frame_t *frame; - int loops = 0, patched = 0; + int loops = 0; switch_time_t last = 0; switch_status_t pop_status; @@ -1612,15 +1697,9 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_ while(conference_utils_member_test_flag(member, MFLAG_RUNNING)) { - if (patched) { - pop_status = switch_frame_buffer_trypop(member->fb, &pop); - } else { - pop_status = switch_frame_buffer_pop(member->fb, &pop); - } + pop_status = switch_frame_buffer_pop(member->fb, &pop); if (pop_status == SWITCH_STATUS_SUCCESS) { - mcu_layer_t *layer = NULL; - mcu_canvas_t *canvas = NULL; if (!pop) { break; @@ -1652,34 +1731,10 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_ } last = now; - - } - + switch_frame_buffer_free(member->fb, &frame); } - - canvas = NULL; - layer = NULL; - patched = 0; - - switch_mutex_lock(member->conference->canvas_mutex); - if (member->video_layer_id > -1 && member->canvas_id > -1) { - canvas = member->conference->canvases[member->canvas_id]; - layer = &canvas->layers[member->video_layer_id]; - - if (layer->need_patch && switch_thread_rwlock_tryrdlock(canvas->video_rwlock) == SWITCH_STATUS_SUCCESS) { - if (layer->need_patch) { - conference_video_scale_and_patch(layer, NULL, SWITCH_FALSE); - patched++; - layer->need_patch = 0; - } - switch_thread_rwlock_unlock(canvas->video_rwlock); - } - } - switch_mutex_unlock(member->conference->canvas_mutex); - } else { - patched = 0; } } @@ -2299,7 +2354,7 @@ static void wait_for_canvas(mcu_canvas_t *canvas) if (layer->need_patch) { if (layer->member_id && layer->member && conference_utils_member_test_flag(layer->member, MFLAG_RUNNING) && layer->member->fb) { - switch_frame_buffer_trypush(layer->member->fb, (void *) 1); + conference_video_wake_layer_thread(layer->member); x++; } else { layer->need_patch = 0; @@ -3098,6 +3153,7 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread_t *thr if (layer->cur_img) { if (layer->member && switch_core_cpu_count() > 2) { layer->need_patch = 1; + conference_video_wake_layer_thread(layer->member); } else { conference_video_scale_and_patch(layer, NULL, SWITCH_FALSE); } @@ -3119,8 +3175,9 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread_t *thr } if (layer->cur_img) { - if (layer->member) { + if (layer->member && switch_core_cpu_count() > 2) { layer->need_patch = 1; + conference_video_wake_layer_thread(layer->member); } else { conference_video_scale_and_patch(layer, NULL, SWITCH_FALSE); } diff --git a/src/mod/applications/mod_conference/mod_conference.c b/src/mod/applications/mod_conference/mod_conference.c index 866fadef27..ae3777467b 100644 --- a/src/mod/applications/mod_conference/mod_conference.c +++ b/src/mod/applications/mod_conference/mod_conference.c @@ -2415,6 +2415,7 @@ SWITCH_STANDARD_APP(conference_function) if (conference->conference_video_mode == CONF_VIDEO_MODE_MUX) { conference_video_launch_muxing_write_thread(&member); + conference_video_launch_layer_thread(&member); } msg.from = __FILE__; @@ -2453,6 +2454,8 @@ SWITCH_STANDARD_APP(conference_function) msg.message_id = SWITCH_MESSAGE_INDICATE_UNBRIDGE; switch_core_session_receive_message(session, &msg); + conference_utils_member_clear_flag(&member, MFLAG_RUNNING); + if (member.video_muxing_write_thread) { switch_status_t st = SWITCH_STATUS_SUCCESS; switch_frame_buffer_push(member.fb, NULL); @@ -2460,6 +2463,18 @@ SWITCH_STANDARD_APP(conference_function) member.video_muxing_write_thread = NULL; } + if (member.video_layer_thread) { + switch_status_t st = SWITCH_STATUS_SUCCESS; + + while(member.layer_thread_running) { + conference_video_wake_layer_thread(&member); + switch_yield(10000); + } + + switch_thread_join(&st, member.video_layer_thread); + member.video_layer_thread = NULL; + } + /* Remove the caller from the conference */ conference_member_del(member.conference, &member); diff --git a/src/mod/applications/mod_conference/mod_conference.h b/src/mod/applications/mod_conference/mod_conference.h index cf5b323359..42c1c9f27b 100644 --- a/src/mod/applications/mod_conference/mod_conference.h +++ b/src/mod/applications/mod_conference/mod_conference.h @@ -754,7 +754,11 @@ struct conference_member { switch_queue_t *dtmf_queue; switch_queue_t *video_queue; switch_thread_t *video_muxing_write_thread; + switch_thread_t *video_layer_thread; + int layer_thread_running; switch_thread_t *input_thread; + switch_thread_cond_t *layer_cond; + switch_mutex_t *layer_cond_mutex; cJSON *json; cJSON *status_field; uint8_t loop_loop; @@ -984,6 +988,8 @@ switch_status_t conference_al_parse_position(al_handle_t *al, const char *data); switch_status_t conference_video_thread_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data); switch_status_t conference_text_thread_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data); void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_t *thread, void *obj); +void conference_video_launch_layer_thread(conference_member_t *member); +void conference_video_wake_layer_thread(conference_member_t *member); void conference_member_check_agc_levels(conference_member_t *member); void conference_member_clear_avg(conference_member_t *member); int conference_member_noise_gate_check(conference_member_t *member);