summaryrefslogtreecommitdiff
path: root/res/res_http_websocket.c
diff options
context:
space:
mode:
authorJoshua Colp <jcolp@digium.com>2016-11-17 11:07:06 -0600
committerGerrit Code Review <gerrit2@gerrit.digium.api>2016-11-17 11:07:06 -0600
commitd3dba74017c6f923f52ddab1c6a03557b18de5ad (patch)
treedea44a244190a015ebe01ce3de2a641540de49f1 /res/res_http_websocket.c
parented143a3b7c7b02fb113f9ce817a02c541b540aa9 (diff)
parent070a51bf7c00f49bb82d26e889b88906a9b2fd0c (diff)
Merge "Implement internal abstraction for iostreams"
Diffstat (limited to 'res/res_http_websocket.c')
-rw-r--r--res/res_http_websocket.c116
1 files changed, 53 insertions, 63 deletions
diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c
index c1a28f65d..106ba488b 100644
--- a/res/res_http_websocket.c
+++ b/res/res_http_websocket.c
@@ -86,8 +86,7 @@
/*! \brief Structure definition for session */
struct ast_websocket {
- FILE *f; /*!< Pointer to the file instance used for writing and reading */
- int fd; /*!< File descriptor for the session, only used for polling */
+ struct ast_iostream *stream; /*!< iostream of the connection */
struct ast_sockaddr address; /*!< Address of the remote client */
enum ast_websocket_opcode opcode; /*!< Cached opcode for multi-frame messages */
size_t payload_len; /*!< Length of the payload */
@@ -178,10 +177,11 @@ static void session_destroy_fn(void *obj)
{
struct ast_websocket *session = obj;
- if (session->f) {
+ if (session->stream) {
ast_websocket_close(session, 0);
- if (session->f) {
- fclose(session->f);
+ if (session->stream) {
+ ast_iostream_close(session->stream);
+ session->stream = NULL;
ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from",
ast_sockaddr_stringify(&session->address));
}
@@ -307,20 +307,22 @@ int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, ui
session->close_sent = 1;
ao2_lock(session);
- res = ast_careful_fwrite(session->f, session->fd, frame, 4, session->timeout);
+ ast_iostream_set_timeout_inactivity(session->stream, session->timeout);
+ res = ast_iostream_write(session->stream, frame, sizeof(frame));
+ ast_iostream_set_timeout_disable(session->stream);
/* If an error occurred when trying to close this connection explicitly terminate it now.
* Doing so will cause the thread polling on it to wake up and terminate.
*/
- if (res) {
- fclose(session->f);
- session->f = NULL;
+ if (res != sizeof(frame)) {
+ ast_iostream_close(session->stream);
+ session->stream = NULL;
ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n",
session->client ? "to" : "from", ast_sockaddr_stringify(&session->address));
}
ao2_unlock(session);
- return res;
+ return res == sizeof(frame);
}
static const char *opcode_map[] = {
@@ -388,7 +390,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en
return -1;
}
- if (ast_careful_fwrite(session->f, session->fd, frame, frame_size, session->timeout)) {
+ ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout);
+ if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) {
ao2_unlock(session);
/* 1011 - server terminating connection due to not being able to fulfill the request */
ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n");
@@ -396,7 +399,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en
return -1;
}
- fflush(session->f);
+ ast_iostream_set_timeout_disable(session->stream);
ao2_unlock(session);
return 0;
@@ -424,7 +427,7 @@ void AST_OPTIONAL_API_NAME(ast_websocket_unref)(struct ast_websocket *session)
int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session)
{
- return session->closing ? -1 : session->fd;
+ return session->closing ? -1 : ast_iostream_get_fd(session->stream);
}
struct ast_sockaddr * AST_OPTIONAL_API_NAME(ast_websocket_remote_address)(struct ast_websocket *session)
@@ -439,18 +442,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_is_secure)(struct ast_websocket *session
int AST_OPTIONAL_API_NAME(ast_websocket_set_nonblock)(struct ast_websocket *session)
{
- int flags;
-
- if ((flags = fcntl(session->fd, F_GETFL)) == -1) {
- return -1;
- }
-
- flags |= O_NONBLOCK;
-
- if ((flags = fcntl(session->fd, F_SETFL, flags)) == -1) {
- return -1;
- }
-
+ ast_iostream_nonblock(session->stream);
+ ast_iostream_set_exclusive_input(session->stream, 0);
return 0;
}
@@ -503,17 +496,16 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len
int sanity = 10;
ao2_lock(session);
- if (!session->f) {
+ if (!session->stream) {
ao2_unlock(session);
errno = ECONNABORTED;
return -1;
}
for (;;) {
- clearerr(session->f);
- rlen = fread(rbuf, 1, xlen, session->f);
- if (!rlen) {
- if (feof(session->f)) {
+ rlen = ast_iostream_read(session->stream, rbuf, xlen);
+ if (rlen != xlen) {
+ if (rlen == 0) {
ast_log(LOG_WARNING, "Web socket closed abruptly\n");
*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
session->closing = 1;
@@ -521,7 +513,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len
return -1;
}
- if (ferror(session->f) && errno != EAGAIN) {
+ if (rlen < 0 && errno != EAGAIN) {
ast_log(LOG_ERROR, "Error reading from web socket: %s\n", strerror(errno));
*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
session->closing = 1;
@@ -542,7 +534,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len
if (!xlen) {
break;
}
- if (ast_wait_for_input(session->fd, 1000) < 0) {
+ if (ast_wait_for_input(ast_iostream_get_fd(session->stream), 1000) < 0) {
ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno));
*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
session->closing = 1;
@@ -837,7 +829,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
ao2_ref(protocol_handler, -1);
return 0;
}
- session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
+ session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
/* Generate the session id */
if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) {
@@ -867,7 +859,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
* Connection_.
*/
if (protocol) {
- fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n"
+ ast_iostream_printf(ser->stream,
+ "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: %s\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n"
@@ -876,15 +869,14 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
websocket_combine_key(key, base64, sizeof(base64)),
protocol);
} else {
- fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n"
+ ast_iostream_printf(ser->stream,
+ "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: %s\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n\r\n",
upgrade,
websocket_combine_key(key, base64, sizeof(base64)));
}
-
- fflush(ser->f);
} else {
/* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */
@@ -896,7 +888,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
}
/* Enable keepalive on all sessions so the underlying user does not have to */
- if (setsockopt(ser->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
+ if (setsockopt(ast_iostream_get_fd(ser->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to enable keepalive\n",
ast_sockaddr_stringify(&ser->remote_address));
websocket_bad_request(ser);
@@ -908,25 +900,23 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version);
/* Populate the session with all the needed details */
- session->f = ser->f;
- session->fd = ser->fd;
+ session->stream = ser->stream;
ast_sockaddr_copy(&session->address, &ser->remote_address);
session->opcode = -1;
session->reconstruct = DEFAULT_RECONSTRUCTION_CEILING;
- session->secure = ser->ssl ? 1 : 0;
+ session->secure = ast_iostream_get_ssl(ser->stream) ? 1 : 0;
/* Give up ownership of the socket and pass it to the protocol handler */
- ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0);
+ ast_iostream_set_exclusive_input(session->stream, 0);
protocol_handler->session_established(session, get_vars, headers);
ao2_ref(protocol_handler, -1);
/*
- * By dropping the FILE* and fd from the session the connection
+ * By dropping the stream from the session the connection
* won't get closed when the HTTP server cleans up because we
* passed the connection to the protocol handler.
*/
- ser->f = NULL;
- ser->fd = -1;
+ ser->stream = NULL;
return 0;
}
@@ -1260,7 +1250,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
int has_accept = 0;
int has_protocol = 0;
- if (!fgets(buf, sizeof(buf), client->ser->f)) {
+ if (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) {
ast_log(LOG_ERROR, "Unable to retrieve HTTP status line.");
return WS_BAD_STATUS;
}
@@ -1273,7 +1263,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
/* Ignoring line folding - assuming header field values are contained
within a single line */
- while (fgets(buf, sizeof(buf), client->ser->f)) {
+ while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) > 0) {
char *name, *value;
int parsed = ast_http_header_parse(buf, &name, &value);
@@ -1326,19 +1316,19 @@ static enum ast_websocket_result websocket_client_handshake(
client->protocols);
}
- if (fprintf(client->ser->f,
- "GET /%s HTTP/1.1\r\n"
- "Sec-WebSocket-Version: %d\r\n"
- "Upgrade: websocket\r\n"
- "Connection: Upgrade\r\n"
- "Host: %s\r\n"
- "Sec-WebSocket-Key: %s\r\n"
- "%s\r\n",
- client->resource_name ? ast_str_buffer(client->resource_name) : "",
- client->version,
- client->host,
- client->key,
- protocols) < 0) {
+ if (ast_iostream_printf(client->ser->stream,
+ "GET /%s HTTP/1.1\r\n"
+ "Sec-WebSocket-Version: %d\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Host: %s\r\n"
+ "Sec-WebSocket-Key: %s\r\n"
+ "%s\r\n",
+ client->resource_name ? ast_str_buffer(client->resource_name) : "",
+ client->version,
+ client->host,
+ client->key,
+ protocols) < 0) {
ast_log(LOG_ERROR, "Failed to send handshake.\n");
return WS_WRITE_ERROR;
}
@@ -1362,9 +1352,9 @@ static enum ast_websocket_result websocket_client_connect(struct ast_websocket *
return res;
}
- ws->f = ws->client->ser->f;
- ws->fd = ws->client->ser->fd;
- ws->secure = ws->client->ser->ssl ? 1 : 0;
+ ws->stream = ws->client->ser->stream;
+ ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0;
+ ws->client->ser->stream = NULL;
ast_sockaddr_copy(&ws->address, &ws->client->ser->remote_address);
return WS_OK;
}