summaryrefslogtreecommitdiff
path: root/res/res_http_websocket.c
diff options
context:
space:
mode:
authorTimo Teräs <timo.teras@iki.fi>2016-06-02 22:10:06 +0300
committerTimo Teräs <timo.teras@iki.fi>2016-11-15 22:25:14 +0200
commit070a51bf7c00f49bb82d26e889b88906a9b2fd0c (patch)
treefddd2462220284d9dd7abba8ec2c1c0d68a68159 /res/res_http_websocket.c
parent0cc14597b29203259b5e6ae4496f9f6d4f4e76f2 (diff)
Implement internal abstraction for iostreams
fopencookie/funclose is a non-standard API and should not be used in portable software. Additionally, the way FILE's fd is used in non-blocking mode is undefined behaviour and cannot be relied on. This introduces internal abstraction for io streams, that allows implementing the desired virtualization of read/write operations with necessary timeout handling. ASTERISK-24515 #close ASTERISK-24517 #close Change-Id: Id916aef418b665ced6a7489aef74908b6e376e85
Diffstat (limited to 'res/res_http_websocket.c')
-rw-r--r--res/res_http_websocket.c116
1 files changed, 53 insertions, 63 deletions
diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c
index 28bf45fc8..cb5e5ff91 100644
--- a/res/res_http_websocket.c
+++ b/res/res_http_websocket.c
@@ -73,8 +73,7 @@
/*! \brief Structure definition for session */
struct ast_websocket {
- FILE *f; /*!< Pointer to the file instance used for writing and reading */
- int fd; /*!< File descriptor for the session, only used for polling */
+ struct ast_iostream *stream; /*!< iostream of the connection */
struct ast_sockaddr address; /*!< Address of the remote client */
enum ast_websocket_opcode opcode; /*!< Cached opcode for multi-frame messages */
size_t payload_len; /*!< Length of the payload */
@@ -165,10 +164,11 @@ static void session_destroy_fn(void *obj)
{
struct ast_websocket *session = obj;
- if (session->f) {
+ if (session->stream) {
ast_websocket_close(session, 0);
- if (session->f) {
- fclose(session->f);
+ if (session->stream) {
+ ast_iostream_close(session->stream);
+ session->stream = NULL;
ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from",
ast_sockaddr_stringify(&session->address));
}
@@ -294,20 +294,22 @@ int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, ui
session->close_sent = 1;
ao2_lock(session);
- res = ast_careful_fwrite(session->f, session->fd, frame, 4, session->timeout);
+ ast_iostream_set_timeout_inactivity(session->stream, session->timeout);
+ res = ast_iostream_write(session->stream, frame, sizeof(frame));
+ ast_iostream_set_timeout_disable(session->stream);
/* If an error occurred when trying to close this connection explicitly terminate it now.
* Doing so will cause the thread polling on it to wake up and terminate.
*/
- if (res) {
- fclose(session->f);
- session->f = NULL;
+ if (res != sizeof(frame)) {
+ ast_iostream_close(session->stream);
+ session->stream = NULL;
ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n",
session->client ? "to" : "from", ast_sockaddr_stringify(&session->address));
}
ao2_unlock(session);
- return res;
+ return res == sizeof(frame);
}
static const char *opcode_map[] = {
@@ -375,7 +377,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en
return -1;
}
- if (ast_careful_fwrite(session->f, session->fd, frame, frame_size, session->timeout)) {
+ ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout);
+ if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) {
ao2_unlock(session);
/* 1011 - server terminating connection due to not being able to fulfill the request */
ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n");
@@ -383,7 +386,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en
return -1;
}
- fflush(session->f);
+ ast_iostream_set_timeout_disable(session->stream);
ao2_unlock(session);
return 0;
@@ -411,7 +414,7 @@ void AST_OPTIONAL_API_NAME(ast_websocket_unref)(struct ast_websocket *session)
int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session)
{
- return session->closing ? -1 : session->fd;
+ return session->closing ? -1 : ast_iostream_get_fd(session->stream);
}
struct ast_sockaddr * AST_OPTIONAL_API_NAME(ast_websocket_remote_address)(struct ast_websocket *session)
@@ -426,18 +429,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_is_secure)(struct ast_websocket *session
int AST_OPTIONAL_API_NAME(ast_websocket_set_nonblock)(struct ast_websocket *session)
{
- int flags;
-
- if ((flags = fcntl(session->fd, F_GETFL)) == -1) {
- return -1;
- }
-
- flags |= O_NONBLOCK;
-
- if ((flags = fcntl(session->fd, F_SETFL, flags)) == -1) {
- return -1;
- }
-
+ ast_iostream_nonblock(session->stream);
+ ast_iostream_set_exclusive_input(session->stream, 0);
return 0;
}
@@ -490,17 +483,16 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len
int sanity = 10;
ao2_lock(session);
- if (!session->f) {
+ if (!session->stream) {
ao2_unlock(session);
errno = ECONNABORTED;
return -1;
}
for (;;) {
- clearerr(session->f);
- rlen = fread(rbuf, 1, xlen, session->f);
- if (!rlen) {
- if (feof(session->f)) {
+ rlen = ast_iostream_read(session->stream, rbuf, xlen);
+ if (rlen != xlen) {
+ if (rlen == 0) {
ast_log(LOG_WARNING, "Web socket closed abruptly\n");
*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
session->closing = 1;
@@ -508,7 +500,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len
return -1;
}
- if (ferror(session->f) && errno != EAGAIN) {
+ if (rlen < 0 && errno != EAGAIN) {
ast_log(LOG_ERROR, "Error reading from web socket: %s\n", strerror(errno));
*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
session->closing = 1;
@@ -529,7 +521,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len
if (!xlen) {
break;
}
- if (ast_wait_for_input(session->fd, 1000) < 0) {
+ if (ast_wait_for_input(ast_iostream_get_fd(session->stream), 1000) < 0) {
ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno));
*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
session->closing = 1;
@@ -824,7 +816,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
ao2_ref(protocol_handler, -1);
return 0;
}
- session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
+ session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
/* Generate the session id */
if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) {
@@ -854,7 +846,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
* Connection_.
*/
if (protocol) {
- fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n"
+ ast_iostream_printf(ser->stream,
+ "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: %s\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n"
@@ -863,15 +856,14 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
websocket_combine_key(key, base64, sizeof(base64)),
protocol);
} else {
- fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n"
+ ast_iostream_printf(ser->stream,
+ "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: %s\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n\r\n",
upgrade,
websocket_combine_key(key, base64, sizeof(base64)));
}
-
- fflush(ser->f);
} else {
/* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */
@@ -883,7 +875,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
}
/* Enable keepalive on all sessions so the underlying user does not have to */
- if (setsockopt(ser->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
+ if (setsockopt(ast_iostream_get_fd(ser->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to enable keepalive\n",
ast_sockaddr_stringify(&ser->remote_address));
websocket_bad_request(ser);
@@ -895,25 +887,23 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version);
/* Populate the session with all the needed details */
- session->f = ser->f;
- session->fd = ser->fd;
+ session->stream = ser->stream;
ast_sockaddr_copy(&session->address, &ser->remote_address);
session->opcode = -1;
session->reconstruct = DEFAULT_RECONSTRUCTION_CEILING;
- session->secure = ser->ssl ? 1 : 0;
+ session->secure = ast_iostream_get_ssl(ser->stream) ? 1 : 0;
/* Give up ownership of the socket and pass it to the protocol handler */
- ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0);
+ ast_iostream_set_exclusive_input(session->stream, 0);
protocol_handler->session_established(session, get_vars, headers);
ao2_ref(protocol_handler, -1);
/*
- * By dropping the FILE* and fd from the session the connection
+ * By dropping the stream from the session the connection
* won't get closed when the HTTP server cleans up because we
* passed the connection to the protocol handler.
*/
- ser->f = NULL;
- ser->fd = -1;
+ ser->stream = NULL;
return 0;
}
@@ -1247,7 +1237,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
int has_accept = 0;
int has_protocol = 0;
- if (!fgets(buf, sizeof(buf), client->ser->f)) {
+ if (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) {
ast_log(LOG_ERROR, "Unable to retrieve HTTP status line.");
return WS_BAD_STATUS;
}
@@ -1260,7 +1250,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
/* Ignoring line folding - assuming header field values are contained
within a single line */
- while (fgets(buf, sizeof(buf), client->ser->f)) {
+ while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) > 0) {
char *name, *value;
int parsed = ast_http_header_parse(buf, &name, &value);
@@ -1313,19 +1303,19 @@ static enum ast_websocket_result websocket_client_handshake(
client->protocols);
}
- if (fprintf(client->ser->f,
- "GET /%s HTTP/1.1\r\n"
- "Sec-WebSocket-Version: %d\r\n"
- "Upgrade: websocket\r\n"
- "Connection: Upgrade\r\n"
- "Host: %s\r\n"
- "Sec-WebSocket-Key: %s\r\n"
- "%s\r\n",
- client->resource_name ? ast_str_buffer(client->resource_name) : "",
- client->version,
- client->host,
- client->key,
- protocols) < 0) {
+ if (ast_iostream_printf(client->ser->stream,
+ "GET /%s HTTP/1.1\r\n"
+ "Sec-WebSocket-Version: %d\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Host: %s\r\n"
+ "Sec-WebSocket-Key: %s\r\n"
+ "%s\r\n",
+ client->resource_name ? ast_str_buffer(client->resource_name) : "",
+ client->version,
+ client->host,
+ client->key,
+ protocols) < 0) {
ast_log(LOG_ERROR, "Failed to send handshake.\n");
return WS_WRITE_ERROR;
}
@@ -1349,9 +1339,9 @@ static enum ast_websocket_result websocket_client_connect(struct ast_websocket *
return res;
}
- ws->f = ws->client->ser->f;
- ws->fd = ws->client->ser->fd;
- ws->secure = ws->client->ser->ssl ? 1 : 0;
+ ws->stream = ws->client->ser->stream;
+ ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0;
+ ws->client->ser->stream = NULL;
ast_sockaddr_copy(&ws->address, &ws->client->ser->remote_address);
return WS_OK;
}