diff --git a/src/mod/formats/mod_shout/mod_shout.c b/src/mod/formats/mod_shout/mod_shout.c index 495d8e813a..54a3034a7f 100644 --- a/src/mod/formats/mod_shout/mod_shout.c +++ b/src/mod/formats/mod_shout/mod_shout.c @@ -64,6 +64,7 @@ struct shout_context { FILE *fp; int samplerate; uint8_t thread_running; + uint8_t shout_init; }; typedef struct shout_context shout_context_t; @@ -421,7 +422,7 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) #define MY_BUF_LEN 1024 * 32 #define MY_BLOCK_SIZE MY_BUF_LEN -static void *SWITCH_THREAD_FUNC stream_thread(switch_thread_t *thread, void *obj) +static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void *obj) { CURL *curl_handle = NULL; shout_context_t *context = (shout_context_t *) obj; @@ -441,7 +442,7 @@ static void *SWITCH_THREAD_FUNC stream_thread(switch_thread_t *thread, void *obj return NULL; } -static void launch_stream_thread(shout_context_t *context) +static void launch_read_stream_thread(shout_context_t *context) { switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; @@ -454,7 +455,75 @@ static void launch_stream_thread(shout_context_t *context) switch_threadattr_create(&thd_attr, context->memory_pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, stream_thread, context, context->memory_pool); + switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool); +} + +static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, void *obj) +{ + shout_context_t *context = (shout_context_t *) obj; + + while(!context->err && context->thread_running) { + unsigned char mp3buf[8192] = ""; + unsigned char audio[8192] = ""; + switch_size_t audio_read = 0; + int rlen; + long ret = 0; + + switch_mutex_lock(context->audio_mutex); + if (context->audio_buffer) { + audio_read = switch_buffer_read(context->audio_buffer, audio, sizeof(audio)); + } else { + context->err++; + } + switch_mutex_unlock(context->audio_mutex); + + error_check(); + + if (!audio_read) { + audio_read = sizeof(audio); + memset(audio, 255, sizeof(audio)); + } + + if ((rlen = lame_encode_buffer(context->gfp, (void *) audio, NULL, audio_read / sizeof(int16_t), mp3buf, sizeof(mp3buf))) < 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "MP3 encode error %d!\n", rlen); + goto error; + } + + if (rlen) { + ret = shout_send(context->shout, mp3buf, rlen); + if (ret != SHOUTERR_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Send error: %s\n", shout_get_error(context->shout)); + goto error; + } + } else { + memset(mp3buf, 0, 128); + ret = shout_send(context->shout, mp3buf, 128); + } + + shout_sync(context->shout); + switch_yield(100000); + } + + error: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Thread Done\n"); + context->thread_running = 0; + return NULL; +} + +static void launch_write_stream_thread(shout_context_t *context) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + + if (context->err) { + return; + } + + context->thread_running = 1; + switch_threadattr_create(&thd_attr, context->memory_pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, write_stream_thread, context, context->memory_pool); } static switch_status_t shout_file_open(switch_file_handle_t *handle, char *path) @@ -485,7 +554,7 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, char *path) InitMP3(&context->mp, OUTSCALE, context->samplerate); if (handle->handler) { context->stream_url = switch_core_sprintf(context->memory_pool, "http://%s", path); - launch_stream_thread(context); + launch_read_stream_thread(context); } else { if (switch_file_open(&context->fd, path, SWITCH_FOPEN_READ, SWITCH_FPROT_UREAD|SWITCH_FPROT_UWRITE, handle->memory_pool) != SWITCH_STATUS_SUCCESS) { @@ -513,6 +582,11 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, char *path) lame_print_config(context->gfp); if (handle->handler) { + if (switch_buffer_create_dynamic(&context->audio_buffer, MY_BLOCK_SIZE, MY_BUF_LEN, 0) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n"); + goto error; + } + switch_mutex_init(&context->audio_mutex, SWITCH_MUTEX_NESTED, context->memory_pool); lame_set_bWriteVbrTag(context->gfp, 0); lame_mp3_tags_fid(context->gfp, NULL); @@ -571,7 +645,7 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, char *path) goto error; } - if (shout_set_url(context->shout, "mod_shout") != SHOUTERR_SUCCESS) { + if (shout_set_url(context->shout, "http://www.freeswitch.org") != SHOUTERR_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error setting name: %s\n", shout_get_error(context->shout)); goto error; } @@ -586,10 +660,6 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, char *path) goto error; } - if (shout_open(context->shout) != SHOUTERR_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening stream: %s\n", shout_get_error(context->shout)); - goto error; - } } else { handle->seekable = 1; /* lame being lame and all has FILE * coded into it's API for some functions so we gotta use it */ @@ -689,31 +759,45 @@ static switch_status_t shout_file_write(switch_file_handle_t *handle, void *data { shout_context_t *context = handle->private_info; unsigned char mp3buf[2048] = ""; - long ret = 0; int rlen; int16_t *audio = data; int nsamples = *len; - assert(context->gfp); - if ((rlen = lame_encode_buffer(context->gfp, audio, NULL, nsamples, mp3buf, sizeof(mp3buf))) < 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "MP3 encode error %d!\n", rlen); - return SWITCH_STATUS_FALSE; + if (context->shout && !context->shout_init) { + context->shout_init++; + if (shout_open(context->shout) != SHOUTERR_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening stream: %s\n", shout_get_error(context->shout)); + context->err++; + return SWITCH_STATUS_FALSE; + } + launch_write_stream_thread(context); } if (handle->handler) { + switch_mutex_lock(context->audio_mutex); + if (context->audio_buffer) { + if (!switch_buffer_write(context->audio_buffer, data, nsamples * sizeof(int16_t))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Buffer error\n"); + context->err++; + } + } else { + context->err++; + } + switch_mutex_unlock(context->audio_mutex); + if (context->err) { + return SWITCH_STATUS_FALSE; + } + } else { + if ((rlen = lame_encode_buffer(context->gfp, audio, NULL, nsamples, mp3buf, sizeof(mp3buf))) < 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "MP3 encode error %d!\n", rlen); + return SWITCH_STATUS_FALSE; + } if (rlen) { - ret = shout_send(context->shout, mp3buf, rlen); - if (ret != SHOUTERR_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Send error: %s\n", shout_get_error(context->shout)); + if (fwrite(mp3buf, 1, rlen, context->fp) < 0) { return SWITCH_STATUS_FALSE; } } - shout_sync(context->shout); - } else { - if (fwrite(mp3buf, 1, rlen, context->fp) < 0) { - return SWITCH_STATUS_FALSE; - } } return SWITCH_STATUS_SUCCESS;