diff --git a/channels/chan_websocket.c b/channels/chan_websocket.c index be2c1b1e6f..eb680d570c 100644 --- a/channels/chan_websocket.c +++ b/channels/chan_websocket.c @@ -59,6 +59,18 @@ /channels/answer ARI endpoint. + + In passthrough mode, the channel driver won't attempt + to re-frame or re-time media coming in over the websocket from + the remote app. This can be used for any codec but MUST be used + for codecs that use packet headers or whose data stream can't be + broken up on arbitrary byte boundaries. In this case, the remote + app is fully responsible for correctly framing and timing media + sent to Asterisk and the MEDIA text commands that could be sent + over the websocket are disabled. Currently, passthrough mode is + automatically set for the opus, speex and g729 codecs. + + This option allows you to add additional parameters to the outbound URI. The format is: @@ -78,6 +90,9 @@ same => n,Dial(WebSocket/connection1/c(sln16)) + + same => n,Dial(WebSocket/connection1/c(opus)) + same => n,Dial(WebSocket/INCOMING/n) @@ -127,6 +142,7 @@ struct websocket_pvt { char *uri_params; char *leftover_data; int no_auto_answer; + int passthrough; int optimal_frame_size; int bulk_media_in_progress; int report_queue_drained; @@ -188,6 +204,7 @@ static void set_channel_format(struct websocket_pvt * instance, if (ast_format_cmp(ast_channel_rawreadformat(instance->channel), fmt) == AST_FORMAT_CMP_NOT_EQUAL) { ast_channel_set_rawreadformat(instance->channel, fmt); + ast_set_read_format(instance->channel, ast_channel_readformat(instance->channel)); ast_debug(4, "Switching readformat to %s\n", ast_format_get_name(fmt)); } } @@ -332,10 +349,10 @@ static struct ast_frame *webchan_read(struct ast_channel *ast) } /* - * If the frame length is already optimal_frame_size, we can just - * return it. + * If we're in passthrough mode or the frame length is already optimal_frame_size, + * we can just return it. */ - if (native_frame->datalen == instance->optimal_frame_size) { + if (instance->passthrough || native_frame->datalen == instance->optimal_frame_size) { set_channel_format(instance, instance->native_format); return native_frame; } @@ -499,6 +516,11 @@ static int process_text_message(struct websocket_pvt *instance, ast_queue_control(instance->channel, AST_CONTROL_HANGUP); } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) { + if (instance->passthrough) { + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", + ast_channel_name(instance->channel), command); + return 0; + } AST_LIST_LOCK(&instance->frame_queue); instance->bulk_media_in_progress = 1; AST_LIST_UNLOCK(&instance->frame_queue); @@ -511,6 +533,12 @@ static int process_text_message(struct websocket_pvt *instance, id = ast_strip(command + strlen(STOP_MEDIA_BUFFERING)); + if (instance->passthrough) { + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", + ast_channel_name(instance->channel), command); + return 0; + } + ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n", ast_channel_name(instance->channel), STOP_MEDIA_BUFFERING, id, (int)instance->leftover_len); @@ -533,6 +561,13 @@ static int process_text_message(struct websocket_pvt *instance, } else if (ast_strings_equal(command, FLUSH_MEDIA)) { struct ast_frame *frame = NULL; + + if (instance->passthrough) { + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", + ast_channel_name(instance->channel), command); + return 0; + } + AST_LIST_LOCK(&instance->frame_queue); while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) { ast_frfree(frame); @@ -543,6 +578,12 @@ static int process_text_message(struct websocket_pvt *instance, AST_LIST_UNLOCK(&instance->frame_queue); } else if (ast_strings_equal(payload, REPORT_QUEUE_DRAINED)) { + if (instance->passthrough) { + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", + ast_channel_name(instance->channel), command); + return 0; + } + AST_LIST_LOCK(&instance->frame_queue); instance->report_queue_drained = 1; AST_LIST_UNLOCK(&instance->frame_queue); @@ -569,11 +610,21 @@ static int process_text_message(struct websocket_pvt *instance, } } else if (ast_strings_equal(payload, PAUSE_MEDIA)) { + if (instance->passthrough) { + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", + ast_channel_name(instance->channel), command); + return 0; + } AST_LIST_LOCK(&instance->frame_queue); instance->queue_paused = 1; AST_LIST_UNLOCK(&instance->frame_queue); } else if (ast_strings_equal(payload, CONTINUE_MEDIA)) { + if (instance->passthrough) { + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", + ast_channel_name(instance->channel), command); + return 0; + } AST_LIST_LOCK(&instance->frame_queue); instance->queue_paused = 0; AST_LIST_UNLOCK(&instance->frame_queue); @@ -607,6 +658,11 @@ static int process_binary_message(struct websocket_pvt *instance, next_frame_ptr = payload; instance->bytes_read += payload_len; + if (instance->passthrough) { + res = queue_frame_from_buffer(instance, payload, payload_len); + return res; + } + if (instance->bulk_media_in_progress && instance->leftover_len > 0) { /* * We have leftover data from a previous websocket message. @@ -791,10 +847,10 @@ static void *read_thread_handler(void *obj) * This is especially important for outbound connections otherwise * the app won't know who the media is for. */ - res = ast_asprintf(&command, "%s connection_id:%s channel:%s format:%s optimal_frame_size:%d", MEDIA_START, + res = ast_asprintf(&command, "%s connection_id:%s channel:%s format:%s optimal_frame_size:%d ptime:%d", MEDIA_START, instance->connection_id, ast_channel_name(instance->channel), ast_format_get_name(instance->native_format), - instance->optimal_frame_size); + instance->optimal_frame_size, instance->native_codec->default_ms); if (res <= 0 || !command) { ast_queue_control(instance->channel, AST_CONTROL_HANGUP); ast_log(LOG_ERROR, "%s: Failed to create MEDIA_START\n", ast_channel_name(instance->channel)); @@ -843,9 +899,11 @@ static int webchan_write(struct ast_channel *ast, struct ast_frame *f) ast_channel_name(ast)); return -1; } - if (f->subclass.format != instance->native_format) { - ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format\n", - ast_channel_name(ast), ast_format_get_name(instance->native_format)); + + if (ast_format_cmp(f->subclass.format, instance->native_format) == AST_FORMAT_CMP_NOT_EQUAL) { + ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format, not '%s'\n", + ast_channel_name(ast), ast_format_get_name(instance->native_format), + ast_format_get_name(f->subclass.format)); return -1; } @@ -1044,15 +1102,36 @@ static struct websocket_pvt* websocket_new(const char *chan_name, * References for native_format and native_codec are now held by the * instance and will be released when the instance is destroyed. */ - instance->optimal_frame_size = - (instance->native_codec->default_ms * instance->native_codec->minimum_bytes) - / instance->native_codec->minimum_ms; - instance->leftover_data = ast_calloc(1, instance->optimal_frame_size); - if (!instance->leftover_data) { - return NULL; + /* + * It's not possible for us to re-time or re-frame media if the data + * stream can't be broken up on arbitrary byte boundaries. This is usually + * indicated by the codec's minimum_bytes being small (10 bytes or less). + * We need to force passthrough mode in this case. + */ + if (instance->native_codec->minimum_bytes <= 10) { + instance->passthrough = 1; + instance->optimal_frame_size = 0; + } else { + instance->optimal_frame_size = + (instance->native_codec->default_ms * instance->native_codec->minimum_bytes) + / instance->native_codec->minimum_ms; + instance->leftover_data = ast_calloc(1, instance->optimal_frame_size); + if (!instance->leftover_data) { + return NULL; + } } + ast_debug(3, + "%s: WebSocket channel native format '%s' Sample rate: %d ptime: %dms minms: %u minbytes: %u passthrough: %d optimal_frame_size: %d\n", + chan_name, ast_format_get_name(instance->native_format), + ast_format_get_sample_rate(instance->native_format), + ast_format_get_default_ms(instance->native_format), + ast_format_get_minimum_ms(instance->native_format), + ast_format_get_minimum_bytes(instance->native_format), + instance->passthrough, + instance->optimal_frame_size); + /* We have exclusive access to proxy and sorcery, no need for locking here. */ if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) { return NULL; @@ -1195,12 +1274,14 @@ enum { OPT_WS_CODEC = (1 << 0), OPT_WS_NO_AUTO_ANSWER = (1 << 1), OPT_WS_URI_PARAM = (1 << 2), + OPT_WS_PASSTHROUGH = (1 << 3), }; enum { OPT_ARG_WS_CODEC, OPT_ARG_WS_NO_AUTO_ANSWER, OPT_ARG_WS_URI_PARAM, + OPT_ARG_WS_PASSTHROUGH, OPT_ARG_ARRAY_SIZE }; @@ -1208,6 +1289,7 @@ AST_APP_OPTIONS(websocket_options, BEGIN_OPTIONS AST_APP_OPTION_ARG('c', OPT_WS_CODEC, OPT_ARG_WS_CODEC), AST_APP_OPTION('n', OPT_WS_NO_AUTO_ANSWER), AST_APP_OPTION_ARG('v', OPT_WS_URI_PARAM, OPT_ARG_WS_URI_PARAM), + AST_APP_OPTION('p', OPT_WS_PASSTHROUGH), END_OPTIONS ); static struct ast_channel *webchan_request(const char *type, @@ -1281,6 +1363,9 @@ static struct ast_channel *webchan_request(const char *type, } instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER); + if (!instance->passthrough) { + instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH); + } if (ast_test_flag(&opts, OPT_WS_URI_PARAM) && !ast_strlen_zero(opt_args[OPT_ARG_WS_URI_PARAM])) {