mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-31 10:47:18 +00:00 
			
		
		
		
	Merge "Implement internal abstraction for iostreams"
This commit is contained in:
		| @@ -211,7 +211,7 @@ static int find_sequence(char * inbuf, int inlen, char * matchbuf, int matchlen) | ||||
| * This function has two modes.  The first to find a boundary marker.  The | ||||
| * second is to find the filename immediately after the boundary. | ||||
| */ | ||||
| static int readmimefile(FILE *fin, FILE *fout, char *boundary, int contentlen) | ||||
| static int readmimefile(struct ast_iostream *in, FILE *fout, char *boundary, int contentlen) | ||||
| { | ||||
| 	int find_filename = 0; | ||||
| 	char buf[4096]; | ||||
| @@ -222,7 +222,7 @@ static int readmimefile(FILE *fin, FILE *fout, char *boundary, int contentlen) | ||||
| 	int boundary_len; | ||||
| 	char * path_end, * path_start, * filespec; | ||||
|  | ||||
| 	if (NULL == fin || NULL == fout || NULL == boundary || 0 >= contentlen) { | ||||
| 	if (NULL == in || NULL == fout || NULL == boundary || 0 >= contentlen) { | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| @@ -236,8 +236,8 @@ static int readmimefile(FILE *fin, FILE *fout, char *boundary, int contentlen) | ||||
| 		} | ||||
|  | ||||
| 		if (0 < num_to_read) { | ||||
| 			if (fread(&(buf[char_in_buf]), 1, num_to_read, fin) < num_to_read) { | ||||
| 				ast_log(LOG_WARNING, "fread() failed: %s\n", strerror(errno)); | ||||
| 			if (ast_iostream_read(in, &(buf[char_in_buf]), num_to_read) < num_to_read) { | ||||
| 				ast_log(LOG_WARNING, "read failed: %s\n", strerror(errno)); | ||||
| 				num_to_read = 0; | ||||
| 			} | ||||
| 			contentlen -= num_to_read; | ||||
| @@ -378,7 +378,7 @@ static int http_post_callback(struct ast_tcptls_session_instance *ser, const str | ||||
| 	 */ | ||||
| 	ast_http_body_read_status(ser, 0); | ||||
|  | ||||
| 	if (0 > readmimefile(ser->f, f, boundary_marker, content_len)) { | ||||
| 	if (0 > readmimefile(ser->stream, f, boundary_marker, content_len)) { | ||||
| 		ast_debug(1, "Cannot find boundary marker in POST request.\n"); | ||||
| 		fclose(f); | ||||
| 		ast_http_error(ser, 400, "Bad Request", "Cannot find boundary marker in POST request."); | ||||
|   | ||||
| @@ -86,8 +86,7 @@ | ||||
|  | ||||
| /*! \brief Structure definition for session */ | ||||
| struct ast_websocket { | ||||
| 	FILE *f;                           /*!< Pointer to the file instance used for writing and reading */ | ||||
| 	int fd;                            /*!< File descriptor for the session, only used for polling */ | ||||
| 	struct ast_iostream *stream;       /*!< iostream of the connection */ | ||||
| 	struct ast_sockaddr address;       /*!< Address of the remote client */ | ||||
| 	enum ast_websocket_opcode opcode;  /*!< Cached opcode for multi-frame messages */ | ||||
| 	size_t payload_len;                /*!< Length of the payload */ | ||||
| @@ -178,10 +177,11 @@ static void session_destroy_fn(void *obj) | ||||
| { | ||||
| 	struct ast_websocket *session = obj; | ||||
|  | ||||
| 	if (session->f) { | ||||
| 	if (session->stream) { | ||||
| 		ast_websocket_close(session, 0); | ||||
| 		if (session->f) { | ||||
| 			fclose(session->f); | ||||
| 		if (session->stream) { | ||||
| 			ast_iostream_close(session->stream); | ||||
| 			session->stream = NULL; | ||||
| 			ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from", | ||||
| 				ast_sockaddr_stringify(&session->address)); | ||||
| 		} | ||||
| @@ -307,20 +307,22 @@ int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, ui | ||||
| 	session->close_sent = 1; | ||||
|  | ||||
| 	ao2_lock(session); | ||||
| 	res = ast_careful_fwrite(session->f, session->fd, frame, 4, session->timeout); | ||||
| 	ast_iostream_set_timeout_inactivity(session->stream, session->timeout); | ||||
| 	res = ast_iostream_write(session->stream, frame, sizeof(frame)); | ||||
| 	ast_iostream_set_timeout_disable(session->stream); | ||||
|  | ||||
| 	/* If an error occurred when trying to close this connection explicitly terminate it now. | ||||
| 	 * Doing so will cause the thread polling on it to wake up and terminate. | ||||
| 	 */ | ||||
| 	if (res) { | ||||
| 		fclose(session->f); | ||||
| 		session->f = NULL; | ||||
| 	if (res != sizeof(frame)) { | ||||
| 		ast_iostream_close(session->stream); | ||||
| 		session->stream = NULL; | ||||
| 		ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n", | ||||
| 			session->client ? "to" : "from", ast_sockaddr_stringify(&session->address)); | ||||
| 	} | ||||
|  | ||||
| 	ao2_unlock(session); | ||||
| 	return res; | ||||
| 	return res == sizeof(frame); | ||||
| } | ||||
|  | ||||
| static const char *opcode_map[] = { | ||||
| @@ -388,7 +390,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	if (ast_careful_fwrite(session->f, session->fd, frame, frame_size, session->timeout)) { | ||||
| 	ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout); | ||||
| 	if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) { | ||||
| 		ao2_unlock(session); | ||||
| 		/* 1011 - server terminating connection due to not being able to fulfill the request */ | ||||
| 		ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n"); | ||||
| @@ -396,7 +399,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	fflush(session->f); | ||||
| 	ast_iostream_set_timeout_disable(session->stream); | ||||
| 	ao2_unlock(session); | ||||
|  | ||||
| 	return 0; | ||||
| @@ -424,7 +427,7 @@ void AST_OPTIONAL_API_NAME(ast_websocket_unref)(struct ast_websocket *session) | ||||
|  | ||||
| int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session) | ||||
| { | ||||
| 	return session->closing ? -1 : session->fd; | ||||
| 	return session->closing ? -1 : ast_iostream_get_fd(session->stream); | ||||
| } | ||||
|  | ||||
| struct ast_sockaddr * AST_OPTIONAL_API_NAME(ast_websocket_remote_address)(struct ast_websocket *session) | ||||
| @@ -439,18 +442,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_is_secure)(struct ast_websocket *session | ||||
|  | ||||
| int AST_OPTIONAL_API_NAME(ast_websocket_set_nonblock)(struct ast_websocket *session) | ||||
| { | ||||
| 	int flags; | ||||
|  | ||||
| 	if ((flags = fcntl(session->fd, F_GETFL)) == -1) { | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	flags |= O_NONBLOCK; | ||||
|  | ||||
| 	if ((flags = fcntl(session->fd, F_SETFL, flags)) == -1) { | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	ast_iostream_nonblock(session->stream); | ||||
| 	ast_iostream_set_exclusive_input(session->stream, 0); | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| @@ -503,17 +496,16 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len | ||||
| 	int sanity = 10; | ||||
|  | ||||
| 	ao2_lock(session); | ||||
| 	if (!session->f) { | ||||
| 	if (!session->stream) { | ||||
| 		ao2_unlock(session); | ||||
| 		errno = ECONNABORTED; | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	for (;;) { | ||||
| 		clearerr(session->f); | ||||
| 		rlen = fread(rbuf, 1, xlen, session->f); | ||||
| 		if (!rlen) { | ||||
| 			if (feof(session->f)) { | ||||
| 		rlen = ast_iostream_read(session->stream, rbuf, xlen); | ||||
| 		if (rlen != xlen) { | ||||
| 			if (rlen == 0) { | ||||
| 				ast_log(LOG_WARNING, "Web socket closed abruptly\n"); | ||||
| 				*opcode = AST_WEBSOCKET_OPCODE_CLOSE; | ||||
| 				session->closing = 1; | ||||
| @@ -521,7 +513,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len | ||||
| 				return -1; | ||||
| 			} | ||||
|  | ||||
| 			if (ferror(session->f) && errno != EAGAIN) { | ||||
| 			if (rlen < 0 && errno != EAGAIN) { | ||||
| 				ast_log(LOG_ERROR, "Error reading from web socket: %s\n", strerror(errno)); | ||||
| 				*opcode = AST_WEBSOCKET_OPCODE_CLOSE; | ||||
| 				session->closing = 1; | ||||
| @@ -542,7 +534,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len | ||||
| 		if (!xlen) { | ||||
| 			break; | ||||
| 		} | ||||
| 		if (ast_wait_for_input(session->fd, 1000) < 0) { | ||||
| 		if (ast_wait_for_input(ast_iostream_get_fd(session->stream), 1000) < 0) { | ||||
| 			ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno)); | ||||
| 			*opcode = AST_WEBSOCKET_OPCODE_CLOSE; | ||||
| 			session->closing = 1; | ||||
| @@ -837,7 +829,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan | ||||
| 			ao2_ref(protocol_handler, -1); | ||||
| 			return 0; | ||||
| 		} | ||||
| 		session->timeout =  AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; | ||||
| 		session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; | ||||
|  | ||||
| 		/* Generate the session id */ | ||||
| 		if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) { | ||||
| @@ -867,7 +859,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan | ||||
| 		 *    Connection_. | ||||
| 		 */ | ||||
| 		if (protocol) { | ||||
| 			fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" | ||||
| 			ast_iostream_printf(ser->stream, | ||||
| 				"HTTP/1.1 101 Switching Protocols\r\n" | ||||
| 				"Upgrade: %s\r\n" | ||||
| 				"Connection: Upgrade\r\n" | ||||
| 				"Sec-WebSocket-Accept: %s\r\n" | ||||
| @@ -876,15 +869,14 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan | ||||
| 				websocket_combine_key(key, base64, sizeof(base64)), | ||||
| 				protocol); | ||||
| 		} else { | ||||
| 			fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" | ||||
| 			ast_iostream_printf(ser->stream, | ||||
| 				"HTTP/1.1 101 Switching Protocols\r\n" | ||||
| 				"Upgrade: %s\r\n" | ||||
| 				"Connection: Upgrade\r\n" | ||||
| 				"Sec-WebSocket-Accept: %s\r\n\r\n", | ||||
| 				upgrade, | ||||
| 				websocket_combine_key(key, base64, sizeof(base64))); | ||||
| 		} | ||||
|  | ||||
| 		fflush(ser->f); | ||||
| 	} else { | ||||
|  | ||||
| 		/* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */ | ||||
| @@ -896,7 +888,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan | ||||
| 	} | ||||
|  | ||||
| 	/* Enable keepalive on all sessions so the underlying user does not have to */ | ||||
| 	if (setsockopt(ser->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { | ||||
| 	if (setsockopt(ast_iostream_get_fd(ser->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { | ||||
| 		ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to enable keepalive\n", | ||||
| 			ast_sockaddr_stringify(&ser->remote_address)); | ||||
| 		websocket_bad_request(ser); | ||||
| @@ -908,25 +900,23 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan | ||||
| 	ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version); | ||||
|  | ||||
| 	/* Populate the session with all the needed details */ | ||||
| 	session->f = ser->f; | ||||
| 	session->fd = ser->fd; | ||||
| 	session->stream = ser->stream; | ||||
| 	ast_sockaddr_copy(&session->address, &ser->remote_address); | ||||
| 	session->opcode = -1; | ||||
| 	session->reconstruct = DEFAULT_RECONSTRUCTION_CEILING; | ||||
| 	session->secure = ser->ssl ? 1 : 0; | ||||
| 	session->secure = ast_iostream_get_ssl(ser->stream) ? 1 : 0; | ||||
|  | ||||
| 	/* Give up ownership of the socket and pass it to the protocol handler */ | ||||
| 	ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0); | ||||
| 	ast_iostream_set_exclusive_input(session->stream, 0); | ||||
| 	protocol_handler->session_established(session, get_vars, headers); | ||||
| 	ao2_ref(protocol_handler, -1); | ||||
|  | ||||
| 	/* | ||||
| 	 * By dropping the FILE* and fd from the session the connection | ||||
| 	 * By dropping the stream from the session the connection | ||||
| 	 * won't get closed when the HTTP server cleans up because we | ||||
| 	 * passed the connection to the protocol handler. | ||||
| 	 */ | ||||
| 	ser->f = NULL; | ||||
| 	ser->fd = -1; | ||||
| 	ser->stream = NULL; | ||||
|  | ||||
| 	return 0; | ||||
| } | ||||
| @@ -1260,7 +1250,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( | ||||
| 	int has_accept = 0; | ||||
| 	int has_protocol = 0; | ||||
|  | ||||
| 	if (!fgets(buf, sizeof(buf), client->ser->f)) { | ||||
| 	if (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) { | ||||
| 		ast_log(LOG_ERROR, "Unable to retrieve HTTP status line."); | ||||
| 		return WS_BAD_STATUS; | ||||
| 	} | ||||
| @@ -1273,7 +1263,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( | ||||
|  | ||||
| 	/* Ignoring line folding - assuming header field values are contained | ||||
| 	   within a single line */ | ||||
| 	while (fgets(buf, sizeof(buf), client->ser->f)) { | ||||
| 	while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) > 0) { | ||||
| 		char *name, *value; | ||||
| 		int parsed = ast_http_header_parse(buf, &name, &value); | ||||
|  | ||||
| @@ -1326,19 +1316,19 @@ static enum ast_websocket_result websocket_client_handshake( | ||||
| 			client->protocols); | ||||
| 	} | ||||
|  | ||||
| 	if (fprintf(client->ser->f, | ||||
| 		    "GET /%s HTTP/1.1\r\n" | ||||
| 		    "Sec-WebSocket-Version: %d\r\n" | ||||
| 		    "Upgrade: websocket\r\n" | ||||
| 		    "Connection: Upgrade\r\n" | ||||
| 		    "Host: %s\r\n" | ||||
| 		    "Sec-WebSocket-Key: %s\r\n" | ||||
| 		    "%s\r\n", | ||||
| 		    client->resource_name ? ast_str_buffer(client->resource_name) : "", | ||||
| 		    client->version, | ||||
| 		    client->host, | ||||
| 		    client->key, | ||||
| 		    protocols) < 0) { | ||||
| 	if (ast_iostream_printf(client->ser->stream, | ||||
| 			"GET /%s HTTP/1.1\r\n" | ||||
| 			"Sec-WebSocket-Version: %d\r\n" | ||||
| 			"Upgrade: websocket\r\n" | ||||
| 			"Connection: Upgrade\r\n" | ||||
| 			"Host: %s\r\n" | ||||
| 			"Sec-WebSocket-Key: %s\r\n" | ||||
| 			"%s\r\n", | ||||
| 			client->resource_name ? ast_str_buffer(client->resource_name) : "", | ||||
| 			client->version, | ||||
| 			client->host, | ||||
| 			client->key, | ||||
| 			protocols) < 0) { | ||||
| 		ast_log(LOG_ERROR, "Failed to send handshake.\n"); | ||||
| 		return WS_WRITE_ERROR; | ||||
| 	} | ||||
| @@ -1362,9 +1352,9 @@ static enum ast_websocket_result websocket_client_connect(struct ast_websocket * | ||||
| 		return res; | ||||
| 	} | ||||
|  | ||||
| 	ws->f = ws->client->ser->f; | ||||
| 	ws->fd = ws->client->ser->fd; | ||||
| 	ws->secure = ws->client->ser->ssl ? 1 : 0; | ||||
| 	ws->stream = ws->client->ser->stream; | ||||
| 	ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0; | ||||
| 	ws->client->ser->stream = NULL; | ||||
| 	ast_sockaddr_copy(&ws->address, &ws->client->ser->remote_address); | ||||
| 	return WS_OK; | ||||
| } | ||||
|   | ||||
| @@ -949,7 +949,7 @@ static int phoneprov_callback(struct ast_tcptls_session_instance *ser, const str | ||||
| 			socklen_t namelen = sizeof(name.sa); | ||||
| 			int res; | ||||
|  | ||||
| 			if ((res = getsockname(ser->fd, &name.sa, &namelen))) { | ||||
| 			if ((res = getsockname(ast_iostream_get_fd(ser->stream), &name.sa, &namelen))) { | ||||
| 				ast_log(LOG_WARNING, "Could not get server IP, breakage likely.\n"); | ||||
| 			} else { | ||||
| 				struct extension *exten_iter; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user