summaryrefslogtreecommitdiff
path: root/main/manager.c
diff options
context:
space:
mode:
authorTimo Teräs <timo.teras@iki.fi>2016-06-02 22:10:06 +0300
committerTimo Teräs <timo.teras@iki.fi>2016-11-15 22:25:14 +0200
commit070a51bf7c00f49bb82d26e889b88906a9b2fd0c (patch)
treefddd2462220284d9dd7abba8ec2c1c0d68a68159 /main/manager.c
parent0cc14597b29203259b5e6ae4496f9f6d4f4e76f2 (diff)
Implement internal abstraction for iostreams
fopencookie/funclose is a non-standard API and should not be used in portable software. Additionally, the way FILE's fd is used in non-blocking mode is undefined behaviour and cannot be relied on. This introduces internal abstraction for io streams, that allows implementing the desired virtualization of read/write operations with necessary timeout handling. ASTERISK-24515 #close ASTERISK-24517 #close Change-Id: Id916aef418b665ced6a7489aef74908b6e376e85
Diffstat (limited to 'main/manager.c')
-rw-r--r--main/manager.c142
1 files changed, 58 insertions, 84 deletions
diff --git a/main/manager.c b/main/manager.c
index ef1afb03d..f059015c7 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1549,8 +1549,7 @@ static void acl_change_stasis_unsubscribe(void)
struct mansession_session {
/*! \todo XXX need to document which fields it is protecting */
struct ast_sockaddr addr; /*!< address we are connecting from */
- FILE *f; /*!< fdopen() on the underlying fd */
- int fd; /*!< descriptor used for output. Either the socket (AMI) or a temporary file (HTTP) */
+ struct ast_iostream *stream; /*!< AMI stream */
int inuse; /*!< number of HTTP sessions using this entry */
int needdestroy; /*!< Whether an HTTP session should be destroyed */
pthread_t waiting_thread; /*!< Sleeping thread using this descriptor */
@@ -1592,9 +1591,8 @@ enum mansession_message_parsing {
*/
struct mansession {
struct mansession_session *session;
+ struct ast_iostream *stream;
struct ast_tcptls_session_instance *tcptls_session;
- FILE *f;
- int fd;
enum mansession_message_parsing parsing;
int write_error:1;
struct manager_custom_hook *hook;
@@ -2166,10 +2164,6 @@ static void session_destructor(void *obj)
ast_datastore_free(datastore);
}
- if (session->f != NULL) {
- fflush(session->f);
- fclose(session->f);
- }
if (eqe) {
ast_atomic_fetchadd_int(&eqe->usecount, -1);
}
@@ -2204,7 +2198,6 @@ static struct mansession_session *build_mansession(const struct ast_sockaddr *ad
return NULL;
}
- newsession->fd = -1;
newsession->waiting_thread = AST_PTHREADT_NULL;
newsession->writetimeout = 100;
newsession->send_events = -1;
@@ -2617,7 +2610,7 @@ static char *handle_showmanconn(struct ast_cli_entry *e, int cmd, struct ast_cli
ast_sockaddr_stringify_addr(&session->addr),
(int) (session->sessionstart),
(int) (now - session->sessionstart),
- session->fd,
+ session->stream ? ast_iostream_get_fd(session->stream) : -1,
session->inuse,
session->readperm,
session->writeperm);
@@ -2889,7 +2882,6 @@ int ast_hook_send_action(struct manager_custom_hook *hook, const char *msg)
* This is necessary to meet the previous design of manager.c
*/
s.hook = hook;
- s.f = (void*)1; /* set this to something so our request will make it through all functions that test it*/
ao2_lock(act_found);
if (act_found->registered && act_found->func) {
@@ -2920,9 +2912,8 @@ int ast_hook_send_action(struct manager_custom_hook *hook, const char *msg)
*/
static int send_string(struct mansession *s, char *string)
{
- int res;
- FILE *f = s->f ? s->f : s->session->f;
- int fd = s->f ? s->fd : s->session->fd;
+ struct ast_iostream *stream = s->stream ? s->stream : s->session->stream;
+ int len, res;
/* It's a result from one of the hook's action invocation */
if (s->hook) {
@@ -2934,7 +2925,12 @@ static int send_string(struct mansession *s, char *string)
return 0;
}
- if ((res = ast_careful_fwrite(f, fd, string, strlen(string), s->session->writetimeout))) {
+ len = strlen(string);
+ ast_iostream_set_timeout_inactivity(stream, s->session->writetimeout);
+ res = ast_iostream_write(stream, string, len);
+ ast_iostream_set_timeout_disable(stream);
+
+ if (res < len) {
s->write_error = 1;
}
@@ -2975,10 +2971,10 @@ void astman_append(struct mansession *s, const char *fmt, ...)
return;
}
- if (s->f != NULL || s->session->f != NULL) {
+ if (s->tcptls_session != NULL && s->tcptls_session->stream != NULL) {
send_string(s, ast_str_buffer(buf));
} else {
- ast_verbose("fd == -1 in astman_append, should not happen\n");
+ ast_verbose("No connection stream in astman_append, should not happen\n");
}
}
@@ -4119,7 +4115,7 @@ static int action_waitevent(struct mansession *s, const struct message *m)
break;
}
if (s->session->managerid == 0) { /* AMI session */
- if (ast_wait_for_input(s->session->fd, 1000)) {
+ if (ast_wait_for_input(ast_iostream_get_fd(s->session->stream), 1000)) {
break;
}
} else { /* HTTP session */
@@ -5924,7 +5920,7 @@ static int process_events(struct mansession *s)
int ret = 0;
ao2_lock(s->session);
- if (s->session->f != NULL) {
+ if (s->session->stream != NULL) {
struct eventqent *eqe = s->session->last_ev;
while ((eqe = advance_event(eqe))) {
@@ -6466,7 +6462,7 @@ static int get_input(struct mansession *s, char *output)
s->session->waiting_thread = pthread_self();
ao2_unlock(s->session);
- res = ast_wait_for_input(s->session->fd, timeout);
+ res = ast_wait_for_input(ast_iostream_get_fd(s->session->stream), timeout);
ao2_lock(s->session);
s->session->waiting_thread = AST_PTHREADT_NULL;
@@ -6484,7 +6480,7 @@ static int get_input(struct mansession *s, char *output)
}
ao2_lock(s->session);
- res = fread(src + s->session->inlen, 1, maxlen - s->session->inlen, s->session->f);
+ res = ast_iostream_read(s->session->stream, src + s->session->inlen, maxlen - s->session->inlen);
if (res < 1) {
res = -1; /* error return */
} else {
@@ -6617,13 +6613,12 @@ static void *session_do(void *data)
struct mansession s = {
.tcptls_session = data,
};
- int flags;
int res;
+ int arg = 1;
struct ast_sockaddr ser_remote_address_tmp;
- struct protoent *p;
if (ast_atomic_fetchadd_int(&unauth_sessions, +1) >= authlimit) {
- fclose(ser->f);
+ ast_iostream_close(ser->stream);
ast_atomic_fetchadd_int(&unauth_sessions, -1);
goto done;
}
@@ -6632,7 +6627,7 @@ static void *session_do(void *data)
session = build_mansession(&ser_remote_address_tmp);
if (session == NULL) {
- fclose(ser->f);
+ ast_iostream_close(ser->stream);
ast_atomic_fetchadd_int(&unauth_sessions, -1);
goto done;
}
@@ -6640,20 +6635,10 @@ static void *session_do(void *data)
/* here we set TCP_NODELAY on the socket to disable Nagle's algorithm.
* This is necessary to prevent delays (caused by buffering) as we
* write to the socket in bits and pieces. */
- p = getprotobyname("tcp");
- if (p) {
- int arg = 1;
- if( setsockopt(ser->fd, p->p_proto, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0 ) {
- ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno));
- }
- } else {
- ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY, getprotobyname(\"tcp\") failed\nSome manager actions may be slow to respond.\n");
+ if (setsockopt(ast_iostream_get_fd(ser->stream), IPPROTO_TCP, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0) {
+ ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno));
}
-
- /* make sure socket is non-blocking */
- flags = fcntl(ser->fd, F_GETFL);
- flags |= O_NONBLOCK;
- fcntl(ser->fd, F_SETFL, flags);
+ ast_iostream_nonblock(ser->stream);
ao2_lock(session);
/* Hook to the tail of the event queue */
@@ -6662,8 +6647,7 @@ static void *session_do(void *data)
ast_mutex_init(&s.lock);
/* these fields duplicate those in the 'ser' structure */
- session->fd = s.fd = ser->fd;
- session->f = s.f = ser->f;
+ session->stream = s.stream = ser->stream;
ast_sockaddr_copy(&session->addr, &ser_remote_address_tmp);
s.session = session;
@@ -6682,9 +6666,9 @@ static void *session_do(void *data)
* We cannot let the stream exclusively wait for data to arrive.
* We have to wake up the task to send async events.
*/
- ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0);
+ ast_iostream_set_exclusive_input(ser->stream, 0);
- ast_tcptls_stream_set_timeout_sequence(ser->stream_cookie,
+ ast_iostream_set_timeout_sequence(ser->stream,
ast_tvnow(), authtimeout * 1000);
astman_append(&s, "Asterisk Call Manager/%s\r\n", AMI_VERSION); /* welcome prompt */
@@ -6693,7 +6677,7 @@ static void *session_do(void *data)
break;
}
if (session->authenticated) {
- ast_tcptls_stream_set_timeout_disable(ser->stream_cookie);
+ ast_iostream_set_timeout_disable(ser->stream);
}
}
/* session is over, explain why and terminate */
@@ -7552,23 +7536,9 @@ static void xml_translate(struct ast_str **out, char *in, struct ast_variable *g
static void close_mansession_file(struct mansession *s)
{
- if (s->f) {
- if (fclose(s->f)) {
- ast_log(LOG_ERROR, "fclose() failed: %s\n", strerror(errno));
- }
- s->f = NULL;
- s->fd = -1;
- } else if (s->fd != -1) {
- /*
- * Issuing shutdown() is necessary here to avoid a race
- * condition where the last data written may not appear
- * in the TCP stream. See ASTERISK-23548
- */
- shutdown(s->fd, SHUT_RDWR);
- if (close(s->fd)) {
- ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno));
- }
- s->fd = -1;
+ if (s->stream) {
+ ast_iostream_close(s->stream);
+ s->stream = NULL;
} else {
ast_log(LOG_ERROR, "Attempted to close file/file descriptor on mansession without a valid file or file descriptor.\n");
}
@@ -7577,17 +7547,20 @@ static void close_mansession_file(struct mansession *s)
static void process_output(struct mansession *s, struct ast_str **out, struct ast_variable *params, enum output_format format)
{
char *buf;
- size_t l;
+ off_t l;
+ int fd;
- if (!s->f)
+ if (!s->stream)
return;
/* Ensure buffer is NULL-terminated */
- fprintf(s->f, "%c", 0);
- fflush(s->f);
+ ast_iostream_write(s->stream, "", 1);
+
+ fd = ast_iostream_get_fd(s->stream);
- if ((l = ftell(s->f)) > 0) {
- if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, s->fd, 0))) {
+ l = lseek(fd, SEEK_CUR, 0);
+ if (l > 0) {
+ if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0))) {
ast_log(LOG_WARNING, "mmap failed. Manager output was not processed\n");
} else {
if (format == FORMAT_XML || format == FORMAT_HTML) {
@@ -7614,6 +7587,7 @@ static int generic_http_callback(struct ast_tcptls_session_instance *ser,
struct mansession s = { .session = NULL, .tcptls_session = ser };
struct mansession_session *session = NULL;
uint32_t ident;
+ int fd;
int blastaway = 0;
struct ast_variable *v;
struct ast_variable *params = get_params;
@@ -7669,17 +7643,17 @@ static int generic_http_callback(struct ast_tcptls_session_instance *ser,
}
s.session = session;
- s.fd = mkstemp(template); /* create a temporary file for command output */
+ fd = mkstemp(template); /* create a temporary file for command output */
unlink(template);
- if (s.fd <= -1) {
+ if (fd <= -1) {
ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)");
goto generic_callback_out;
}
- s.f = fdopen(s.fd, "w+");
- if (!s.f) {
+ s.stream = ast_iostream_from_fd(&fd);
+ if (!s.stream) {
ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno));
ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)");
- close(s.fd);
+ close(fd);
goto generic_callback_out;
}
@@ -7819,9 +7793,9 @@ generic_callback_out:
if (blastaway) {
session_destroy(session);
} else {
- if (session->f) {
- fclose(session->f);
- session->f = NULL;
+ if (session->stream) {
+ ast_iostream_close(session->stream);
+ session->stream = NULL;
}
unref_mansession(session);
}
@@ -7846,6 +7820,7 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser,
struct message m = { 0 };
unsigned int idx;
size_t hdrlen;
+ int fd;
time_t time_now = time(NULL);
unsigned long nonce = 0, nc;
@@ -8024,17 +7999,17 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser,
ast_mutex_init(&s.lock);
s.session = session;
- s.fd = mkstemp(template); /* create a temporary file for command output */
+ fd = mkstemp(template); /* create a temporary file for command output */
unlink(template);
- if (s.fd <= -1) {
+ if (fd <= -1) {
ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)");
goto auth_callback_out;
}
- s.f = fdopen(s.fd, "w+");
- if (!s.f) {
+ s.stream = ast_iostream_from_fd(&fd);
+ if (!s.stream) {
ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno));
ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)");
- close(s.fd);
+ close(fd);
goto auth_callback_out;
}
@@ -8085,7 +8060,7 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser,
m.headers[idx] = NULL;
}
- result_size = ftell(s.f); /* Calculate approx. size of result */
+ result_size = lseek(ast_iostream_get_fd(s.stream), SEEK_CUR, 0); /* Calculate approx. size of result */
http_header = ast_str_create(80);
out = ast_str_create(result_size * 2 + 512);
@@ -8137,11 +8112,10 @@ auth_callback_out:
ast_free(out);
ao2_lock(session);
- if (session->f) {
- fclose(session->f);
+ if (session->stream) {
+ ast_iostream_close(session->stream);
+ session->stream = NULL;
}
- session->f = NULL;
- session->fd = -1;
ao2_unlock(session);
if (session->needdestroy) {