diff options
author | Joshua Colp <jcolp@digium.com> | 2016-11-17 11:07:06 -0600 |
---|---|---|
committer | Gerrit Code Review <gerrit2@gerrit.digium.api> | 2016-11-17 11:07:06 -0600 |
commit | d3dba74017c6f923f52ddab1c6a03557b18de5ad (patch) | |
tree | dea44a244190a015ebe01ce3de2a641540de49f1 /res/res_http_websocket.c | |
parent | ed143a3b7c7b02fb113f9ce817a02c541b540aa9 (diff) | |
parent | 070a51bf7c00f49bb82d26e889b88906a9b2fd0c (diff) |
Merge "Implement internal abstraction for iostreams"
Diffstat (limited to 'res/res_http_websocket.c')
-rw-r--r-- | res/res_http_websocket.c | 116 |
1 files changed, 53 insertions, 63 deletions
diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c index c1a28f65d..106ba488b 100644 --- a/res/res_http_websocket.c +++ b/res/res_http_websocket.c @@ -86,8 +86,7 @@ /*! \brief Structure definition for session */ struct ast_websocket { - FILE *f; /*!< Pointer to the file instance used for writing and reading */ - int fd; /*!< File descriptor for the session, only used for polling */ + struct ast_iostream *stream; /*!< iostream of the connection */ struct ast_sockaddr address; /*!< Address of the remote client */ enum ast_websocket_opcode opcode; /*!< Cached opcode for multi-frame messages */ size_t payload_len; /*!< Length of the payload */ @@ -178,10 +177,11 @@ static void session_destroy_fn(void *obj) { struct ast_websocket *session = obj; - if (session->f) { + if (session->stream) { ast_websocket_close(session, 0); - if (session->f) { - fclose(session->f); + if (session->stream) { + ast_iostream_close(session->stream); + session->stream = NULL; ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from", ast_sockaddr_stringify(&session->address)); } @@ -307,20 +307,22 @@ int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, ui session->close_sent = 1; ao2_lock(session); - res = ast_careful_fwrite(session->f, session->fd, frame, 4, session->timeout); + ast_iostream_set_timeout_inactivity(session->stream, session->timeout); + res = ast_iostream_write(session->stream, frame, sizeof(frame)); + ast_iostream_set_timeout_disable(session->stream); /* If an error occurred when trying to close this connection explicitly terminate it now. * Doing so will cause the thread polling on it to wake up and terminate. */ - if (res) { - fclose(session->f); - session->f = NULL; + if (res != sizeof(frame)) { + ast_iostream_close(session->stream); + session->stream = NULL; ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n", session->client ? "to" : "from", ast_sockaddr_stringify(&session->address)); } ao2_unlock(session); - return res; + return res == sizeof(frame); } static const char *opcode_map[] = { @@ -388,7 +390,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en return -1; } - if (ast_careful_fwrite(session->f, session->fd, frame, frame_size, session->timeout)) { + ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout); + if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) { ao2_unlock(session); /* 1011 - server terminating connection due to not being able to fulfill the request */ ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n"); @@ -396,7 +399,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en return -1; } - fflush(session->f); + ast_iostream_set_timeout_disable(session->stream); ao2_unlock(session); return 0; @@ -424,7 +427,7 @@ void AST_OPTIONAL_API_NAME(ast_websocket_unref)(struct ast_websocket *session) int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session) { - return session->closing ? -1 : session->fd; + return session->closing ? -1 : ast_iostream_get_fd(session->stream); } struct ast_sockaddr * AST_OPTIONAL_API_NAME(ast_websocket_remote_address)(struct ast_websocket *session) @@ -439,18 +442,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_is_secure)(struct ast_websocket *session int AST_OPTIONAL_API_NAME(ast_websocket_set_nonblock)(struct ast_websocket *session) { - int flags; - - if ((flags = fcntl(session->fd, F_GETFL)) == -1) { - return -1; - } - - flags |= O_NONBLOCK; - - if ((flags = fcntl(session->fd, F_SETFL, flags)) == -1) { - return -1; - } - + ast_iostream_nonblock(session->stream); + ast_iostream_set_exclusive_input(session->stream, 0); return 0; } @@ -503,17 +496,16 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len int sanity = 10; ao2_lock(session); - if (!session->f) { + if (!session->stream) { ao2_unlock(session); errno = ECONNABORTED; return -1; } for (;;) { - clearerr(session->f); - rlen = fread(rbuf, 1, xlen, session->f); - if (!rlen) { - if (feof(session->f)) { + rlen = ast_iostream_read(session->stream, rbuf, xlen); + if (rlen != xlen) { + if (rlen == 0) { ast_log(LOG_WARNING, "Web socket closed abruptly\n"); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -521,7 +513,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len return -1; } - if (ferror(session->f) && errno != EAGAIN) { + if (rlen < 0 && errno != EAGAIN) { ast_log(LOG_ERROR, "Error reading from web socket: %s\n", strerror(errno)); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -542,7 +534,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len if (!xlen) { break; } - if (ast_wait_for_input(session->fd, 1000) < 0) { + if (ast_wait_for_input(ast_iostream_get_fd(session->stream), 1000) < 0) { ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno)); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -837,7 +829,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan ao2_ref(protocol_handler, -1); return 0; } - session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; + session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; /* Generate the session id */ if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) { @@ -867,7 +859,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan * Connection_. */ if (protocol) { - fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" + ast_iostream_printf(ser->stream, + "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: %s\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n" @@ -876,15 +869,14 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan websocket_combine_key(key, base64, sizeof(base64)), protocol); } else { - fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" + ast_iostream_printf(ser->stream, + "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: %s\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n\r\n", upgrade, websocket_combine_key(key, base64, sizeof(base64))); } - - fflush(ser->f); } else { /* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */ @@ -896,7 +888,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan } /* Enable keepalive on all sessions so the underlying user does not have to */ - if (setsockopt(ser->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { + if (setsockopt(ast_iostream_get_fd(ser->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to enable keepalive\n", ast_sockaddr_stringify(&ser->remote_address)); websocket_bad_request(ser); @@ -908,25 +900,23 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version); /* Populate the session with all the needed details */ - session->f = ser->f; - session->fd = ser->fd; + session->stream = ser->stream; ast_sockaddr_copy(&session->address, &ser->remote_address); session->opcode = -1; session->reconstruct = DEFAULT_RECONSTRUCTION_CEILING; - session->secure = ser->ssl ? 1 : 0; + session->secure = ast_iostream_get_ssl(ser->stream) ? 1 : 0; /* Give up ownership of the socket and pass it to the protocol handler */ - ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0); + ast_iostream_set_exclusive_input(session->stream, 0); protocol_handler->session_established(session, get_vars, headers); ao2_ref(protocol_handler, -1); /* - * By dropping the FILE* and fd from the session the connection + * By dropping the stream from the session the connection * won't get closed when the HTTP server cleans up because we * passed the connection to the protocol handler. */ - ser->f = NULL; - ser->fd = -1; + ser->stream = NULL; return 0; } @@ -1260,7 +1250,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( int has_accept = 0; int has_protocol = 0; - if (!fgets(buf, sizeof(buf), client->ser->f)) { + if (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) { ast_log(LOG_ERROR, "Unable to retrieve HTTP status line."); return WS_BAD_STATUS; } @@ -1273,7 +1263,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( /* Ignoring line folding - assuming header field values are contained within a single line */ - while (fgets(buf, sizeof(buf), client->ser->f)) { + while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) > 0) { char *name, *value; int parsed = ast_http_header_parse(buf, &name, &value); @@ -1326,19 +1316,19 @@ static enum ast_websocket_result websocket_client_handshake( client->protocols); } - if (fprintf(client->ser->f, - "GET /%s HTTP/1.1\r\n" - "Sec-WebSocket-Version: %d\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Host: %s\r\n" - "Sec-WebSocket-Key: %s\r\n" - "%s\r\n", - client->resource_name ? ast_str_buffer(client->resource_name) : "", - client->version, - client->host, - client->key, - protocols) < 0) { + if (ast_iostream_printf(client->ser->stream, + "GET /%s HTTP/1.1\r\n" + "Sec-WebSocket-Version: %d\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Host: %s\r\n" + "Sec-WebSocket-Key: %s\r\n" + "%s\r\n", + client->resource_name ? ast_str_buffer(client->resource_name) : "", + client->version, + client->host, + client->key, + protocols) < 0) { ast_log(LOG_ERROR, "Failed to send handshake.\n"); return WS_WRITE_ERROR; } @@ -1362,9 +1352,9 @@ static enum ast_websocket_result websocket_client_connect(struct ast_websocket * return res; } - ws->f = ws->client->ser->f; - ws->fd = ws->client->ser->fd; - ws->secure = ws->client->ser->ssl ? 1 : 0; + ws->stream = ws->client->ser->stream; + ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0; + ws->client->ser->stream = NULL; ast_sockaddr_copy(&ws->address, &ws->client->ser->remote_address); return WS_OK; } |