diff options
author | Timo Teräs <timo.teras@iki.fi> | 2016-06-02 22:10:06 +0300 |
---|---|---|
committer | Timo Teräs <timo.teras@iki.fi> | 2016-11-15 22:25:14 +0200 |
commit | 070a51bf7c00f49bb82d26e889b88906a9b2fd0c (patch) | |
tree | fddd2462220284d9dd7abba8ec2c1c0d68a68159 /res/res_http_websocket.c | |
parent | 0cc14597b29203259b5e6ae4496f9f6d4f4e76f2 (diff) |
Implement internal abstraction for iostreams
fopencookie/funclose is a non-standard API and should not be used
in portable software. Additionally, the way FILE's fd is used in
non-blocking mode is undefined behaviour and cannot be relied on.
This introduces internal abstraction for io streams, that allows
implementing the desired virtualization of read/write operations
with necessary timeout handling.
ASTERISK-24515 #close
ASTERISK-24517 #close
Change-Id: Id916aef418b665ced6a7489aef74908b6e376e85
Diffstat (limited to 'res/res_http_websocket.c')
-rw-r--r-- | res/res_http_websocket.c | 116 |
1 files changed, 53 insertions, 63 deletions
diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c index 28bf45fc8..cb5e5ff91 100644 --- a/res/res_http_websocket.c +++ b/res/res_http_websocket.c @@ -73,8 +73,7 @@ /*! \brief Structure definition for session */ struct ast_websocket { - FILE *f; /*!< Pointer to the file instance used for writing and reading */ - int fd; /*!< File descriptor for the session, only used for polling */ + struct ast_iostream *stream; /*!< iostream of the connection */ struct ast_sockaddr address; /*!< Address of the remote client */ enum ast_websocket_opcode opcode; /*!< Cached opcode for multi-frame messages */ size_t payload_len; /*!< Length of the payload */ @@ -165,10 +164,11 @@ static void session_destroy_fn(void *obj) { struct ast_websocket *session = obj; - if (session->f) { + if (session->stream) { ast_websocket_close(session, 0); - if (session->f) { - fclose(session->f); + if (session->stream) { + ast_iostream_close(session->stream); + session->stream = NULL; ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from", ast_sockaddr_stringify(&session->address)); } @@ -294,20 +294,22 @@ int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, ui session->close_sent = 1; ao2_lock(session); - res = ast_careful_fwrite(session->f, session->fd, frame, 4, session->timeout); + ast_iostream_set_timeout_inactivity(session->stream, session->timeout); + res = ast_iostream_write(session->stream, frame, sizeof(frame)); + ast_iostream_set_timeout_disable(session->stream); /* If an error occurred when trying to close this connection explicitly terminate it now. * Doing so will cause the thread polling on it to wake up and terminate. */ - if (res) { - fclose(session->f); - session->f = NULL; + if (res != sizeof(frame)) { + ast_iostream_close(session->stream); + session->stream = NULL; ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n", session->client ? "to" : "from", ast_sockaddr_stringify(&session->address)); } ao2_unlock(session); - return res; + return res == sizeof(frame); } static const char *opcode_map[] = { @@ -375,7 +377,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en return -1; } - if (ast_careful_fwrite(session->f, session->fd, frame, frame_size, session->timeout)) { + ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout); + if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) { ao2_unlock(session); /* 1011 - server terminating connection due to not being able to fulfill the request */ ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n"); @@ -383,7 +386,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en return -1; } - fflush(session->f); + ast_iostream_set_timeout_disable(session->stream); ao2_unlock(session); return 0; @@ -411,7 +414,7 @@ void AST_OPTIONAL_API_NAME(ast_websocket_unref)(struct ast_websocket *session) int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session) { - return session->closing ? -1 : session->fd; + return session->closing ? -1 : ast_iostream_get_fd(session->stream); } struct ast_sockaddr * AST_OPTIONAL_API_NAME(ast_websocket_remote_address)(struct ast_websocket *session) @@ -426,18 +429,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_is_secure)(struct ast_websocket *session int AST_OPTIONAL_API_NAME(ast_websocket_set_nonblock)(struct ast_websocket *session) { - int flags; - - if ((flags = fcntl(session->fd, F_GETFL)) == -1) { - return -1; - } - - flags |= O_NONBLOCK; - - if ((flags = fcntl(session->fd, F_SETFL, flags)) == -1) { - return -1; - } - + ast_iostream_nonblock(session->stream); + ast_iostream_set_exclusive_input(session->stream, 0); return 0; } @@ -490,17 +483,16 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len int sanity = 10; ao2_lock(session); - if (!session->f) { + if (!session->stream) { ao2_unlock(session); errno = ECONNABORTED; return -1; } for (;;) { - clearerr(session->f); - rlen = fread(rbuf, 1, xlen, session->f); - if (!rlen) { - if (feof(session->f)) { + rlen = ast_iostream_read(session->stream, rbuf, xlen); + if (rlen != xlen) { + if (rlen == 0) { ast_log(LOG_WARNING, "Web socket closed abruptly\n"); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -508,7 +500,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len return -1; } - if (ferror(session->f) && errno != EAGAIN) { + if (rlen < 0 && errno != EAGAIN) { ast_log(LOG_ERROR, "Error reading from web socket: %s\n", strerror(errno)); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -529,7 +521,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len if (!xlen) { break; } - if (ast_wait_for_input(session->fd, 1000) < 0) { + if (ast_wait_for_input(ast_iostream_get_fd(session->stream), 1000) < 0) { ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno)); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -824,7 +816,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan ao2_ref(protocol_handler, -1); return 0; } - session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; + session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; /* Generate the session id */ if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) { @@ -854,7 +846,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan * Connection_. */ if (protocol) { - fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" + ast_iostream_printf(ser->stream, + "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: %s\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n" @@ -863,15 +856,14 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan websocket_combine_key(key, base64, sizeof(base64)), protocol); } else { - fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" + ast_iostream_printf(ser->stream, + "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: %s\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n\r\n", upgrade, websocket_combine_key(key, base64, sizeof(base64))); } - - fflush(ser->f); } else { /* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */ @@ -883,7 +875,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan } /* Enable keepalive on all sessions so the underlying user does not have to */ - if (setsockopt(ser->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { + if (setsockopt(ast_iostream_get_fd(ser->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to enable keepalive\n", ast_sockaddr_stringify(&ser->remote_address)); websocket_bad_request(ser); @@ -895,25 +887,23 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version); /* Populate the session with all the needed details */ - session->f = ser->f; - session->fd = ser->fd; + session->stream = ser->stream; ast_sockaddr_copy(&session->address, &ser->remote_address); session->opcode = -1; session->reconstruct = DEFAULT_RECONSTRUCTION_CEILING; - session->secure = ser->ssl ? 1 : 0; + session->secure = ast_iostream_get_ssl(ser->stream) ? 1 : 0; /* Give up ownership of the socket and pass it to the protocol handler */ - ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0); + ast_iostream_set_exclusive_input(session->stream, 0); protocol_handler->session_established(session, get_vars, headers); ao2_ref(protocol_handler, -1); /* - * By dropping the FILE* and fd from the session the connection + * By dropping the stream from the session the connection * won't get closed when the HTTP server cleans up because we * passed the connection to the protocol handler. */ - ser->f = NULL; - ser->fd = -1; + ser->stream = NULL; return 0; } @@ -1247,7 +1237,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( int has_accept = 0; int has_protocol = 0; - if (!fgets(buf, sizeof(buf), client->ser->f)) { + if (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) { ast_log(LOG_ERROR, "Unable to retrieve HTTP status line."); return WS_BAD_STATUS; } @@ -1260,7 +1250,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( /* Ignoring line folding - assuming header field values are contained within a single line */ - while (fgets(buf, sizeof(buf), client->ser->f)) { + while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) > 0) { char *name, *value; int parsed = ast_http_header_parse(buf, &name, &value); @@ -1313,19 +1303,19 @@ static enum ast_websocket_result websocket_client_handshake( client->protocols); } - if (fprintf(client->ser->f, - "GET /%s HTTP/1.1\r\n" - "Sec-WebSocket-Version: %d\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Host: %s\r\n" - "Sec-WebSocket-Key: %s\r\n" - "%s\r\n", - client->resource_name ? ast_str_buffer(client->resource_name) : "", - client->version, - client->host, - client->key, - protocols) < 0) { + if (ast_iostream_printf(client->ser->stream, + "GET /%s HTTP/1.1\r\n" + "Sec-WebSocket-Version: %d\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Host: %s\r\n" + "Sec-WebSocket-Key: %s\r\n" + "%s\r\n", + client->resource_name ? ast_str_buffer(client->resource_name) : "", + client->version, + client->host, + client->key, + protocols) < 0) { ast_log(LOG_ERROR, "Failed to send handshake.\n"); return WS_WRITE_ERROR; } @@ -1349,9 +1339,9 @@ static enum ast_websocket_result websocket_client_connect(struct ast_websocket * return res; } - ws->f = ws->client->ser->f; - ws->fd = ws->client->ser->fd; - ws->secure = ws->client->ser->ssl ? 1 : 0; + ws->stream = ws->client->ser->stream; + ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0; + ws->client->ser->stream = NULL; ast_sockaddr_copy(&ws->address, &ws->client->ser->remote_address); return WS_OK; } |