cleanup and refactor ws, this should work on Linux now

This commit is contained in:
Seven Du 2013-08-11 22:47:42 +08:00
parent e1cc972702
commit 017691598c
3 changed files with 31 additions and 177 deletions

View File

@ -566,7 +566,7 @@ void event_handler(switch_event_t *event) {
abyss_bool websocket_hook(TSession *r) abyss_bool websocket_hook(TSession *r)
{ {
wsh_t wsh; wsh_t *wsh;
int ret; int ret;
int i; int i;
ws_opcode_t opcode; ws_opcode_t opcode;
@ -585,47 +585,45 @@ abyss_bool websocket_hook(TSession *r)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "headers %s: %s\n", item->name, item->value); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "headers %s: %s\n", item->name, item->value);
} }
key = TableFind(&r->requestHeaderFields, "sec-websocket-key"); key = RequestHeaderValue(r, "sec-websocket-key");
version = TableFind(&r->requestHeaderFields, "sec-websocket-version"); version = RequestHeaderValue(r, "sec-websocket-version");
proto = TableFind(&r->requestHeaderFields, "sec-websocket-protocol"); proto = RequestHeaderValue(r, "sec-websocket-protocol");
upgrade = TableFind(&r->requestHeaderFields, "connection"); upgrade = RequestHeaderValue(r, "upgrade");
if (!key || !version || !proto || !upgrade) return FALSE; if (!key || !version || !proto || !upgrade) return FALSE;
if (!strstr(upgrade, "Upgrade") || strncasecmp(proto, "websocket", 9)) return FALSE; if (strncasecmp(upgrade, "websocket", 9) || strncasecmp(proto, "websocket", 9)) return FALSE;
ret = ws_init(&wsh, r, NULL, 0); wsh = ws_init(r);
if (ret != 0) { if (!wsh) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "websocket error %d\n", ret); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "websocket error %d\n", ret);
return FALSE; return FALSE;
} }
while(!wsh.down && !wsh.handshake) { ret = ws_handshake_kvp(wsh, key, version, proto);
ret = ws_handshake_kvp(&wsh, key, version, proto); if (ret < 0) wsh->down = 1;
if (ret < 0) wsh.down = 1;
}
if (ret != 0) { if (ret != 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "handshake error %d\n", ret); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "handshake error %d\n", ret);
return FALSE; return FALSE;
} }
if (switch_event_bind_removable("websocket", SWITCH_EVENT_CUSTOM, "websocket::stophook", stop_hook_event_handler, &wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) { if (switch_event_bind_removable("websocket", SWITCH_EVENT_CUSTOM, "websocket::stophook", stop_hook_event_handler, wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n");
node_count--; node_count--;
} }
while (!wsh.down) { while (!wsh->down) {
int bytes = ws_read_frame(&wsh, &opcode, &data); int bytes = ws_read_frame(wsh, &opcode, &data);
if (bytes < 0) { if (bytes < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%d %s\n", opcode, (char *)data); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%d %s\n", opcode, (char *)data);
switch_yield(1000); switch_yield(100000);
continue; continue;
} }
switch (opcode) { switch (opcode) {
case WSOC_CLOSE: case WSOC_CLOSE:
ws_close(&wsh, 1000); ws_close(wsh, 1000);
break; break;
case WSOC_CONTINUATION: case WSOC_CONTINUATION:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "continue\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "continue\n");
@ -670,7 +668,7 @@ abyss_bool websocket_hook(TSession *r)
continue; continue;
} }
if (switch_event_bind_removable("websocket", type, subclass, event_handler, &wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) { if (switch_event_bind_removable("websocket", type, subclass, event_handler, wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n");
node_count--; node_count--;
continue; continue;
@ -685,11 +683,13 @@ abyss_bool websocket_hook(TSession *r)
} }
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "wsh.down = %d, node_count = %d\n", wsh.down, node_count); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "wsh->down = %d, node_count = %d\n", wsh->down, node_count);
switch_yield(2000); switch_yield(2000);
while (--node_count >= 0) switch_event_unbind(&nodes[node_count]); while (--node_count >= 0) switch_event_unbind(&nodes[node_count]);
switch_safe_free(wsh);
return FALSE; return FALSE;
} }

View File

@ -245,10 +245,10 @@ int ws_handshake_kvp(wsh_t *wsh, char *key, char *version, char *proto)
b64, b64,
proto); proto);
ws_raw_write(wsh, respond, strlen(respond)); if (ws_raw_write(wsh, respond, strlen(respond))) {
wsh->handshake = 1; wsh->handshake = 1;
return 0;
return 0; }
err: err:
@ -269,21 +269,6 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes)
int x = 0; int x = 0;
TConn *conn = wsh->tsession->connP; TConn *conn = wsh->tsession->connP;
#if 0
if (wsh->ssl) {
do {
r = SSL_read(wsh->ssl, data, bytes);
#ifndef _MSC_VER
if (x++) usleep(10000);
#else
if (x++) Sleep(10);
#endif
} while (r == -1 && SSL_get_error(wsh->ssl, r) == SSL_ERROR_WANT_READ && x < 100);
return r;
}
#endif
if (!wsh->handshake) { if (!wsh->handshake) {
r = wsh->tsession->connP->buffersize; r = wsh->tsession->connP->buffersize;
memcpy(data, conn->buffer.b, r); memcpy(data, conn->buffer.b, r);
@ -298,13 +283,13 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes)
r = conn->buffersize - conn->bufferpos; r = conn->buffersize - conn->bufferpos;
if (r < 0) { if (r < 0) {
printf("348 Read Error %d!\n", r); printf("286 Read Error %d!\n", r);
return 0; return 0;
} else if (r == 0) { } else if (r == 0) {
ConnRead(conn, 2, NULL, NULL, &readError); ConnRead(conn, 2, NULL, NULL, &readError);
if (readError) { if (readError) {
// printf("354 Read Error %s\n", readError); // printf("292 Read Error %s\n", readError);
xmlrpc_strfree(readError); xmlrpc_strfree(readError);
return 0; return 0;
} }
@ -347,132 +332,20 @@ issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes)
return 0; return 0;
} }
//if (r<0) {
//printf("wRITE FAIL: %s\n", strerror(errno));
//}
return r; return r;
} }
#ifdef _MSC_VER wsh_t * ws_init(ws_tsession_t *tsession)
static int setup_socket(ws_socket_t sock)
{ {
unsigned long v = 1; wsh_t *wsh = malloc(sizeof(*wsh));
if (ioctlsocket(sock, FIONBIO, &v) == SOCKET_ERROR) { if (!wsh) return NULL;
return -1;
}
return 0;
}
static int restore_socket(ws_socket_t sock)
{
unsigned long v = 0;
if (ioctlsocket(sock, FIONBIO, &v) == SOCKET_ERROR) {
return -1;
}
return 0;
}
#else
static int setup_socket(ws_socket_t sock)
{
int flags = fcntl(sock, F_GETFL, 0);
return fcntl(sock, F_SETFL, flags | O_NONBLOCK);
}
static int restore_socket(ws_socket_t sock)
{
int flags = fcntl(sock, F_GETFL, 0);
flags &= ~O_NONBLOCK;
return fcntl(sock, F_SETFL, flags);
}
#endif
int ws_init(wsh_t *wsh, ws_tsession_t *tsession, SSL_CTX *ssl_ctx, int close_sock)
{
memset(wsh, 0, sizeof(*wsh)); memset(wsh, 0, sizeof(*wsh));
wsh->tsession = tsession; wsh->tsession = tsession;
if (!ssl_ctx) {
ssl_ctx = globals.ssl_ctx;
}
if (close_sock) {
wsh->close_sock = 1;
}
wsh->buflen = sizeof(wsh->buffer); wsh->buflen = sizeof(wsh->buffer);
wsh->secure = ssl_ctx ? 1 : 0;
// setup_socket(sock); return wsh;
if (wsh->secure) {
int code;
int sanity = 500;
wsh->ssl = SSL_new(ssl_ctx);
assert(wsh->ssl);
SSL_set_fd(wsh->ssl, wsh->sock);
do {
code = SSL_accept(wsh->ssl);
if (code == 1) {
break;
}
if (code == 0) {
return -1;
}
if (code < 0) {
if (code == -1 && SSL_get_error(wsh->ssl, code) != SSL_ERROR_WANT_READ) {
return -1;
}
}
#ifndef _MSC_VER
usleep(10000);
#else
Sleep(10);
#endif
} while (--sanity > 0);
if (!sanity) {
return -1;
}
}
/*
while (!wsh->down && !wsh->handshake) {
int r = ws_handshake(wsh);
if (r < 0) {
wsh->down = 1;
return -1;
}
}
*/
if (wsh->down) {
return -1;
}
return 0;
} }
void ws_destroy(wsh_t *wsh) void ws_destroy(wsh_t *wsh)
@ -511,26 +384,8 @@ issize_t ws_close(wsh_t *wsh, int16_t reason)
} }
wsh->down = 1; wsh->down = 1;
if (reason && wsh->sock != ws_sock_invalid) {
uint16_t *u16;
uint8_t fr[4] = {WSOC_CLOSE | 0x80, 2, 0};
u16 = (uint16_t *) &fr[2];
*u16 = htons((int16_t)reason);
ws_raw_write(wsh, fr, 4);
}
restore_socket(wsh->sock);
if (wsh->close_sock) {
close(wsh->sock);
}
wsh->sock = ws_sock_invalid;
return reason * -1; return reason * -1;
} }
issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data) issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)

View File

@ -59,7 +59,7 @@ typedef enum {
} ws_opcode_t; } ws_opcode_t;
typedef struct wsh_s { typedef struct wsh_s {
ws_socket_t sock; ws_tsession_t *tsession;
char buffer[65536]; char buffer[65536];
char wbuffer[65536]; char wbuffer[65536];
size_t buflen; size_t buflen;
@ -73,7 +73,6 @@ typedef struct wsh_s {
uint8_t down; uint8_t down;
int secure; int secure;
uint8_t close_sock; uint8_t close_sock;
ws_tsession_t *tsession;
} wsh_t; } wsh_t;
issize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc); issize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc);
@ -84,7 +83,7 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes);
issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes); issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes);
issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data); issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data);
issize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes); issize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes);
int ws_init(wsh_t *wsh, ws_tsession_t *tsession, SSL_CTX *ssl_ctx, int close_sock); wsh_t * ws_init(ws_tsession_t *tsession);
issize_t ws_close(wsh_t *wsh, int16_t reason); issize_t ws_close(wsh_t *wsh, int16_t reason);
void ws_destroy(wsh_t *wsh); void ws_destroy(wsh_t *wsh);
void init_ssl(void); void init_ssl(void);