Fix race condition between shout thread starting and shout_file_read

Fix race condition between thread terminating and context being cleaned up
Fix stutter when first 64k is drained from shoutcast streaming buffer
Reduce the amount of padding from around 6s to 1s if the stream goes away
  or runs slow


git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@13218 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Rupa Schomaker 2009-05-01 16:22:36 +00:00
parent b4a30ae134
commit e9c8418a89
1 changed files with 45 additions and 9 deletions

View File

@ -24,6 +24,7 @@
* Contributor(s):
*
* Anthony Minessale II <anthm@freeswitch.org>
* Rupa Schomaker <rupa@rupa.com>
*
* mod_shout.c -- Icecast Module
*
@ -122,6 +123,7 @@ struct shout_context {
switch_size_t rlen;
unsigned char *mp3buf;
switch_size_t mp3buflen;
switch_thread_rwlock_t *rwlock;
};
typedef struct shout_context shout_context_t;
@ -134,6 +136,7 @@ static inline void free_context(shout_context_t *context)
if (context) {
context->err++;
switch_thread_rwlock_wrlock(context->rwlock);
if (context->fd) {
switch_file_close(context->fd);
@ -201,6 +204,9 @@ static inline void free_context(shout_context_t *context)
if (context->mh) {
mpg123_delete(context->mh);
}
switch_thread_rwlock_unlock(context->rwlock);
switch_thread_rwlock_destroy(context->rwlock);
}
}
@ -405,7 +411,8 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
unsigned char *out;
int outlen;
int usedlen;
uint32_t used, buf_size = 1024 * 64;
uint32_t used, buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after
first 64k buffer is dry */
in = ptr;
inlen = realsize;
@ -433,7 +440,7 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
switch_mutex_unlock(context->audio_mutex);
if (used < buf_size) {
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Buffered %u/%u!\n", used, buf_size);
/* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Buffered %u/%u!\n", used, buf_size); */
break;
}
@ -506,6 +513,8 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
{
CURL *curl_handle = NULL;
shout_context_t *context = (shout_context_t *) obj;
switch_thread_rwlock_rdlock(context->rwlock);
curl_handle = curl_easy_init();
curl_easy_setopt(curl_handle, CURLOPT_URL, context->stream_url);
@ -516,11 +525,13 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "FreeSWITCH(mod_shout)/1.0");
curl_easy_perform(curl_handle);
curl_easy_cleanup(curl_handle);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Thread Done\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n");
switch_mutex_lock(context->audio_mutex);
context->err++;
switch_mutex_unlock(context->audio_mutex);
context->thread_running = 0;
switch_thread_rwlock_unlock(context->rwlock);
return NULL;
}
@ -544,9 +555,12 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi
{
shout_context_t *context = (shout_context_t *) obj;
switch_thread_rwlock_rdlock(context->rwlock);
if (context->thread_running) {
context->thread_running++;
} else {
switch_thread_rwlock_unlock(context->rwlock);
return NULL;
}
@ -617,7 +631,8 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi
}
error:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Thread Done\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Thread Done\n");
switch_thread_rwlock_unlock(context->rwlock);
context->thread_running = 0;
return NULL;
}
@ -652,6 +667,7 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
char *username, *password, *port;
char *err = NULL;
int portno = 0;
int sanity = 0;
if ((context = switch_core_alloc(handle->memory_pool, sizeof(*context))) == 0) {
return SWITCH_STATUS_MEMERR;
@ -664,6 +680,10 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
context->memory_pool = handle->memory_pool;
context->samplerate = handle->samplerate;
context->handle = handle;
switch_thread_rwlock_create(&(context->rwlock), context->memory_pool);
switch_thread_rwlock_rdlock(context->rwlock);
if (switch_test_flag(handle, SWITCH_FILE_FLAG_READ)) {
if (switch_buffer_create_dynamic(&context->audio_buffer, TC_BUFFER_SIZE, TC_BUFFER_SIZE * 2, 0) != SWITCH_STATUS_SUCCESS) {
@ -682,6 +702,10 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
context->stream_url = switch_core_sprintf(context->memory_pool, "http://%s", path);
context->prebuf = handle->prebuf;
launch_read_stream_thread(context);
while((switch_buffer_inuse(context->audio_buffer) < 1*2*context->samplerate) && ++sanity < 10) {
/* at least 1s of audio and up to 5s initialize */
switch_yield(500000);
}
} else {
mpg123_param(context->mh, MPG123_FLAGS, MPG123_MONO_MIX, 0);
if (switch_file_open(&context->fd, path, SWITCH_FOPEN_READ, SWITCH_FPROT_UREAD | SWITCH_FPROT_UWRITE, handle->memory_pool) !=
@ -820,7 +844,7 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
/* lame being lame and all has FILE * coded into it's API for some functions so we gotta use it */
if (!(context->fp = fopen(path, "wb+"))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening %s\n", path);
return SWITCH_STATUS_GENERR;
goto error;
}
}
}
@ -830,10 +854,12 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
handle->sections = 0;
handle->speed = 0;
handle->private_info = context;
switch_thread_rwlock_unlock(context->rwlock);
return SWITCH_STATUS_SUCCESS;
error:
switch_thread_rwlock_unlock(context->rwlock);
if (err) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: %s\n", err);
}
@ -884,7 +910,7 @@ static switch_status_t shout_file_seek(switch_file_handle_t *handle, unsigned in
static switch_status_t shout_file_read(switch_file_handle_t *handle, void *data, size_t *len)
{
shout_context_t *context = handle->private_info;
size_t rb = 0, bytes = *len * sizeof(int16_t);
size_t rb = 0, bytes = *len * sizeof(int16_t), newbytes = 0;
*len = 0;
@ -905,14 +931,24 @@ static switch_status_t shout_file_read(switch_file_handle_t *handle, void *data,
}
switch_mutex_unlock(context->audio_mutex);
}
if (context->err) {
return SWITCH_STATUS_FALSE;
}
/* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rb: %d, bytes: %d\n", (int) rb, (int) bytes); */
if (rb) {
*len = rb / sizeof(int16_t);
} else {
/* no data, so insert 1 second of silence */
newbytes = 2 * handle->samplerate;
if(newbytes < bytes) {
bytes = newbytes;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Padding mp3 stream with 1s of empty audio. (%s)\n", context->stream_url);
memset(data, 255, bytes);
*len = bytes / sizeof(int16_t);
}
@ -1055,7 +1091,7 @@ static switch_status_t shout_file_set_string(switch_file_handle_t *handle, switc
id3tag_set_genre(context->gfp, string);
break;
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Value Ignored\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Value Ignored %d, %s\n", col, string);
break;
}
@ -1085,7 +1121,7 @@ static switch_status_t shout_file_set_string(switch_file_handle_t *handle, switc
}
break;
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Value Ignored\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Value Ignored %d, %s\n", col, string);
break;
}