From 2b5f40b38e619b40f923868cae9e9840b7759312 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 17 Feb 2016 15:15:03 -0600 Subject: [PATCH] FS-8811 #resolve [FS 1.7 crashes intermittently] --- .../mod_local_stream/mod_local_stream.c | 124 +++++++++--------- src/switch_core_file.c | 4 +- src/switch_ivr_bridge.c | 8 +- 3 files changed, 66 insertions(+), 70 deletions(-) diff --git a/src/mod/formats/mod_local_stream/mod_local_stream.c b/src/mod/formats/mod_local_stream/mod_local_stream.c index 68fc8eb8ed..eb465cdaf6 100644 --- a/src/mod/formats/mod_local_stream/mod_local_stream.c +++ b/src/mod/formats/mod_local_stream/mod_local_stream.c @@ -114,10 +114,28 @@ struct local_stream_source { switch_image_t *cover_art; char *banner_txt; int serno; + switch_size_t abuflen; + switch_byte_t *abuf; }; typedef struct local_stream_source local_stream_source_t; +local_stream_source_t *get_source(const char *path) +{ + local_stream_source_t *source = NULL; + + switch_mutex_lock(globals.mutex); + if ((source = switch_core_hash_find(globals.source_hash, path))) { + if (!RUNNING || source->stopped || switch_thread_rwlock_tryrdlock(source->rwlock) != SWITCH_STATUS_SUCCESS) { + source = NULL; + } + } + switch_mutex_unlock(globals.mutex); + + return source; +} + + switch_status_t list_streams_full(const char *line, const char *cursor, switch_console_callback_match_t **matches, switch_bool_t show_aliases) { local_stream_source_t *source; @@ -183,9 +201,9 @@ static void flush_video_queue(switch_queue_t *q) static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void *obj) { - local_stream_source_t *source = obj; + volatile local_stream_source_t *s = (local_stream_source_t *) obj; + local_stream_source_t *source = (local_stream_source_t *) s; switch_file_handle_t fh = { 0 }; - local_stream_context_t *cp; char file_buf[128] = "", path_buf[512] = "", last_path[512], png_buf[512] = "", tmp_buf[512] = ""; switch_timer_t timer = { 0 }; int fd = -1; @@ -216,13 +234,13 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void switch_thread_rwlock_create(&source->rwlock, source->pool); if (RUNNING) { + source->ready = 1; switch_mutex_lock(globals.mutex); switch_core_hash_insert(globals.source_hash, source->name, source); switch_mutex_unlock(globals.mutex); - source->ready = 1; } - while (RUNNING && !source->stopped) { + while (RUNNING && !source->stopped && source->ready) { const char *fname; if (temp_pool) { @@ -258,7 +276,6 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void while (RUNNING && !source->stopped) { switch_size_t olen; - uint8_t abuf[SWITCH_RECOMMENDED_BUFFER_SIZE] = { 0 }; const char *artist = NULL, *title = NULL; if (fd > -1) { @@ -452,11 +469,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void if (use_fh == &source->chime_fh) { olen = source->samples; - switch_core_file_read(&fh, abuf, &olen); + switch_core_file_read(&fh, source->abuf, &olen); olen = source->samples; } - if (switch_core_file_read(use_fh, abuf, &olen) != SWITCH_STATUS_SUCCESS || !olen) { + if (switch_core_file_read(use_fh, source->abuf, &olen) != SWITCH_STATUS_SUCCESS || !olen) { switch_core_file_close(use_fh); flush_video_queue(source->video_q); @@ -480,7 +497,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void } if (source->total) { - switch_buffer_write(audio_buffer, abuf, olen * 2 * source->channels); + switch_buffer_write(audio_buffer, source->abuf, olen * 2 * source->channels); } else { switch_buffer_zero(audio_buffer); } @@ -502,6 +519,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void //if (!is_open || used >= source->prebuf || (source->total && used > source->samples * 2 * source->channels)) { void *pop; uint32_t bused; + local_stream_context_t *cp = NULL; used = switch_buffer_read(audio_buffer, dist_buf, source->samples * 2 * source->channels); @@ -509,7 +527,6 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void switch_mutex_lock(source->mutex); for (cp = source->context_list; cp && RUNNING; cp = cp->next) { - if (source->has_video) { switch_set_flag(cp->handle, SWITCH_FILE_FLAG_VIDEO); } else { @@ -621,6 +638,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "local_stream://%s fully reloaded.\n",source->name); + switch_thread_rwlock_unlock(source->rwlock); launch_streams(source->name); goto done; } @@ -683,22 +701,18 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons return SWITCH_STATUS_FALSE; } - switch_mutex_lock(globals.mutex); - top: alt_path = switch_mprintf("%s/%d", path, handle->samplerate); - if ((source = switch_core_hash_find(globals.source_hash, alt_path))) { + if ((source = get_source(alt_path))) { path = alt_path; } else { - source = switch_core_hash_find(globals.source_hash, path); + source = get_source(path); } - if (source) { - if (switch_thread_rwlock_tryrdlock(source->rwlock) != SWITCH_STATUS_SUCCESS) { - source = NULL; - } - } else { + + + if (!source) { if (!switch_stristr("default", alt_path) && !switch_stristr("default", path)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unknown source %s, trying 'default'\n", path); free(alt_path); @@ -706,7 +720,6 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons goto top; } } - switch_mutex_unlock(globals.mutex); if (!source) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown source %s\n", path); @@ -715,8 +728,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons } if ((context = switch_core_alloc(handle->memory_pool, sizeof(*context))) == 0) { - status = SWITCH_STATUS_MEMERR; - goto end; + abort(); } switch_queue_create(&context->video_q, 500, handle->memory_pool); @@ -760,6 +772,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons switch_mutex_unlock(source->mutex); end: + switch_safe_free(alt_path); return status; } @@ -783,7 +796,7 @@ static switch_status_t local_stream_file_close(switch_file_handle_t *handle) last = cp; } - if (context->video_q) { + if (context->source->has_video) { flush_video_queue(context->video_q); switch_queue_trypush(context->video_q, NULL); switch_queue_interrupt_all(context->video_q); @@ -1046,6 +1059,8 @@ static void launch_thread(const char *name, const char *path, switch_xml_t direc } source->samples = switch_samples_per_packet(source->rate, source->interval); + source->abuflen = (source->samples * 2 * source->channels) + 1024; + source->abuf = switch_core_alloc(source->pool, source->abuflen); switch_mutex_init(&source->mutex, SWITCH_MUTEX_NESTED, source->pool); switch_threadattr_create(&thd_attr, source->pool); switch_threadattr_detach_set(thd_attr, 1); @@ -1108,55 +1123,37 @@ SWITCH_STANDARD_API(local_stream_function) local_stream_name = argv[1]; + if (!strcasecmp(argv[0], "hup") && local_stream_name) { - switch_mutex_lock(globals.mutex); - source = switch_core_hash_find(globals.source_hash, local_stream_name); - switch_mutex_unlock(globals.mutex); - - if (source) { + if ((source = get_source(local_stream_name))) { source->hup = 1; stream->write_function(stream, "+OK hup stream: %s", source->name); - goto done; + switch_thread_rwlock_unlock(source->rwlock); } } else if (!strcasecmp(argv[0], "stop") && local_stream_name) { - switch_mutex_lock(globals.mutex); - source = switch_core_hash_find(globals.source_hash, local_stream_name); - switch_mutex_unlock(globals.mutex); - - if (!source) { + if ((source = get_source(local_stream_name))) { + source->stopped = 1; + stream->write_function(stream, "+OK"); + switch_thread_rwlock_unlock(source->rwlock); + } else { stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name); - goto done; } - - source->stopped = 1; - stream->write_function(stream, "+OK"); } else if (!strcasecmp(argv[0], "reload") && local_stream_name) { - switch_mutex_lock(globals.mutex); - source = switch_core_hash_find(globals.source_hash, local_stream_name); - switch_mutex_unlock(globals.mutex); - - if (!source) { + if ((source = get_source(local_stream_name))) { + source->full_reload = 1; + source->part_reload = 1; + stream->write_function(stream, "+OK"); + } else { stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name); - goto done; } - - source->full_reload = 1; - source->part_reload = 1; - stream->write_function(stream, "+OK"); } else if (!strcasecmp(argv[0], "start") && local_stream_name) { - switch_mutex_lock(globals.mutex); - source = switch_core_hash_find(globals.source_hash, local_stream_name); - switch_mutex_unlock(globals.mutex); - - if (source) { + if ((source = get_source(local_stream_name))) { source->stopped = 0; stream->write_function(stream, "+OK stream: %s", source->name); - goto done; - } - - if ((ok = launch_streams(local_stream_name))) { - stream->write_function(stream, "+OK stream: %s", local_stream_name); - goto done; + } else { + if ((ok = launch_streams(local_stream_name))) { + stream->write_function(stream, "+OK stream: %s", local_stream_name); + } } } else if (!strcasecmp(argv[0], "show")) { @@ -1165,22 +1162,21 @@ SWITCH_STANDARD_API(local_stream_function) void *val; switch_bool_t xml = SWITCH_FALSE; - switch_mutex_lock(globals.mutex); if (argc == 1) { + switch_mutex_lock(globals.mutex); for (hi = switch_core_hash_first(globals.source_hash); hi; hi = switch_core_hash_next(&hi)) { switch_core_hash_this(hi, &var, NULL, &val); if ((source = (local_stream_source_t *) val)) { stream->write_function(stream, "%s,%s\n", source->name, source->location); } } + switch_mutex_unlock(globals.mutex); } else { if (argc == 4 && !strcasecmp("xml", argv[3])) { xml = SWITCH_TRUE; } - source = switch_core_hash_find(globals.source_hash, local_stream_name); - - if (source) { + if ((source = get_source(local_stream_name))) { if (xml) { stream->write_function(stream, "\n\n", source->name); stream->write_function(stream, " %s\n", source->location); @@ -1210,13 +1206,11 @@ SWITCH_STANDARD_API(local_stream_function) stream->write_function(stream, " stopped: %s\n", (source->stopped) ? "true" : "false"); stream->write_function(stream, " reloading: %s\n", (source->full_reload) ? "true" : "false"); } + switch_thread_rwlock_unlock(source->rwlock); } else { stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name); } } - switch_mutex_unlock(globals.mutex); - - goto done; } goto done; diff --git a/src/switch_core_file.c b/src/switch_core_file.c index c6d92ded06..b241fce0cb 100644 --- a/src/switch_core_file.c +++ b/src/switch_core_file.c @@ -292,7 +292,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_perform_file_open(const char *file, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Spool dir is set. Make sure [%s] is also a valid path\n", fh->spool_path); } UNPROTECT_INTERFACE(fh->file_interface); - switch_goto_status(status, fail); + goto fail; } fh->real_channels = fh->channels; @@ -305,7 +305,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_perform_file_open(const char *file, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "File [%s] not created!\n", file_path); fh->file_interface->file_close(fh); UNPROTECT_INTERFACE(fh->file_interface); - switch_goto_status(status, fail); + goto fail; } if (to) { diff --git a/src/switch_ivr_bridge.c b/src/switch_ivr_bridge.c index cdebd44558..8ad4813066 100644 --- a/src/switch_ivr_bridge.c +++ b/src/switch_ivr_bridge.c @@ -333,6 +333,7 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj) time_t answer_limit = 0; const char *exec_app = NULL; const char *exec_data = NULL; + switch_codec_implementation_t read_impl = { 0 }; #ifdef SWITCH_VIDEO_IN_THREADS struct vid_helper vh = { 0 }; @@ -345,6 +346,9 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj) return NULL; } + switch_core_session_get_read_impl(session_a, &read_impl); + + input_callback = data->input_callback; user_data = data->session_data; stream_id = data->stream_id; @@ -405,8 +409,6 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj) } if ((silence_var = switch_channel_get_variable(chan_a, "bridge_generate_comfort_noise"))) { - switch_codec_implementation_t read_impl = { 0 }; - switch_core_session_get_read_impl(session_a, &read_impl); if (!switch_channel_media_up(chan_a)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session_a), SWITCH_LOG_ERROR, "Channel has no media!\n"); @@ -683,7 +685,7 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj) if (switch_test_flag(read_frame, SFF_CNG)) { if (silence_val) { switch_generate_sln_silence((int16_t *) silence_frame.data, silence_frame.samples, - read_frame->codec->implementation->number_of_channels, silence_val); + read_impl.number_of_channels, silence_val); read_frame = &silence_frame; } else if (!switch_channel_test_flag(chan_b, CF_ACCEPT_CNG)) { continue;