diff --git a/src/mod/formats/mod_shout/mod_shout.c b/src/mod/formats/mod_shout/mod_shout.c index a81336a79c..763ca5ec1b 100644 --- a/src/mod/formats/mod_shout/mod_shout.c +++ b/src/mod/formats/mod_shout/mod_shout.c @@ -117,7 +117,6 @@ struct shout_context { int dlen; FILE *fp; size_t samplerate; - uint8_t thread_running; uint8_t shout_init; uint32_t prebuf; int lame_ready; @@ -128,6 +127,9 @@ struct shout_context { switch_size_t mp3buflen; switch_thread_rwlock_t *rwlock; int buffer_seconds; + switch_thread_t *read_stream_thread; + switch_thread_t *write_stream_thread; + curl_socket_t curlfd; }; typedef struct shout_context shout_context_t; @@ -137,6 +139,7 @@ static void decode_fd(shout_context_t *context, void *data, size_t bytes); static inline void free_context(shout_context_t *context) { size_t ret; + switch_status_t st; if (context) { switch_mutex_lock(context->audio_mutex); @@ -144,16 +147,17 @@ static inline void free_context(shout_context_t *context) switch_mutex_unlock(context->audio_mutex); if (context->stream_url) { - int sanity = 0; - - while (context->thread_running) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url); - switch_yield(500000); - if (++sanity > 10) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Giving up waiting for stream to terminate: %s\n", context->stream_url); - break; - } + if (context->curlfd > -1) { + shutdown(context->curlfd, 2); } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url); + if (context->read_stream_thread) { + switch_thread_join(&st, context->read_stream_thread); + } + } + + if (context->write_stream_thread) { + switch_thread_join(&st, context->write_stream_thread); } switch_thread_rwlock_wrlock(context->rwlock); @@ -368,6 +372,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) uint32_t buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after first 64k buffer is dry */ switch_size_t used; + if (context->err) { + goto error; + } + if (!context->stream_channels) { long rate = 0; int channels = 0; @@ -398,6 +406,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) switch_yield(500000); } + if (context->err) { + goto error; + } + if (mpg123_feed(context->mh, ptr, realsize) != MPG123_OK) { goto error; } @@ -442,6 +454,22 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) return 0; } +static int progress_callback(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) +{ + shout_context_t *context = (shout_context_t *) clientp; + return context->err; +} + + +static int sockopt_callback(void *clientp, curl_socket_t curlfd, + curlsocktype purpose) +{ + shout_context_t *context = (shout_context_t *) clientp; + + context->curlfd = curlfd; + + return CURL_SOCKOPT_OK; +} #define MY_BUF_LEN 1024 * 32 #define MY_BLOCK_SIZE MY_BUF_LEN @@ -452,9 +480,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void shout_context_t *context = (shout_context_t *) obj; switch_thread_rwlock_rdlock(context->rwlock); - + context->curlfd = -1; curl_handle = switch_curl_easy_init(); switch_curl_easy_setopt(curl_handle, CURLOPT_URL, context->stream_url); + curl_easy_setopt(curl_handle, CURLOPT_PROGRESSFUNCTION, progress_callback); + curl_easy_setopt(curl_handle, CURLOPT_PROGRESSDATA, (void *)context); switch_curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1); switch_curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 10); switch_curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, stream_callback); @@ -465,7 +495,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, 100); /* handle trickle connections */ switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, 30); switch_curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, context->curl_error_buff); + curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTFUNCTION, sockopt_callback); + curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTDATA, (void *)context); + cc = switch_curl_easy_perform(curl_handle); + if (cc && cc != CURLE_WRITE_ERROR) { /* write error is ok, we just exited from callback early */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "CURL returned error:[%d] %s : %s [%s]\n", cc, switch_curl_easy_strerror(cc), context->curl_error_buff, context->stream_url); @@ -474,21 +508,17 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n"); context->eof++; - context->thread_running = 0; switch_thread_rwlock_unlock(context->rwlock); return NULL; } static void launch_read_stream_thread(shout_context_t *context) { - switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; - context->thread_running = 1; switch_threadattr_create(&thd_attr, context->memory_pool); - switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool); + switch_thread_create(&context->read_stream_thread, thd_attr, read_stream_thread, context, context->memory_pool); } #define error_check() if (context->err) goto error; @@ -499,20 +529,13 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi switch_thread_rwlock_rdlock(context->rwlock); - if (context->thread_running) { - context->thread_running++; - } else { - switch_thread_rwlock_unlock(context->rwlock); - return NULL; - } - if (!context->lame_ready) { lame_init_params(context->gfp); lame_print_config(context->gfp); context->lame_ready = 1; } - while (!context->err && context->thread_running) { + while (!context->err) { unsigned char mp3buf[20480] = ""; int16_t audio[9600] = { 0 }; switch_size_t audio_read = 0; @@ -575,31 +598,21 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi error: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Thread Done\n"); switch_thread_rwlock_unlock(context->rwlock); - context->thread_running = 0; + return NULL; } static void launch_write_stream_thread(shout_context_t *context) { - switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; - int sanity = 10; if (context->err) { return; } - context->thread_running = 1; switch_threadattr_create(&thd_attr, context->memory_pool); - switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, write_stream_thread, context, context->memory_pool); - - while (context->thread_running && context->thread_running != 2) { - switch_yield(100000); - if (!--sanity) - break; - } + switch_thread_create(&context->write_stream_thread, thd_attr, write_stream_thread, context, context->memory_pool); } #define TC_BUFFER_SIZE 1024 * 32