diff options
author | Mark Michelson <mmichelson@digium.com> | 2015-07-31 11:58:30 -0500 |
---|---|---|
committer | Gerrit Code Review <gerrit2@gerrit.digium.api> | 2015-07-31 11:58:30 -0500 |
commit | b002e09214008265f89abec95ca520783e71e319 (patch) | |
tree | f4f912fd49786355e22b1ef0286ad92ba467d281 /res/ari | |
parent | 92ddda68aa9d482e9a935fc5fdfe7daee35a43ac (diff) | |
parent | fe804b09b31a1fd43de8d110f025c447aa4a6b62 (diff) |
Merge "ARI: Channels added to Stasis application during WebSocket creation ..."
Diffstat (limited to 'res/ari')
-rw-r--r-- | res/ari/ari_websockets.c | 8 | ||||
-rw-r--r-- | res/ari/resource_events.c | 518 | ||||
-rw-r--r-- | res/ari/resource_events.h | 16 |
3 files changed, 394 insertions, 148 deletions
diff --git a/res/ari/ari_websockets.c b/res/ari/ari_websockets.c index ac5b5788d..9a4f2c9fe 100644 --- a/res/ari/ari_websockets.c +++ b/res/ari/ari_websockets.c @@ -153,7 +153,7 @@ struct ast_json *ast_ari_websocket_session_read( "{" \ " \"error\": \"InvalidMessage\"," \ " \"message\": \"Message validation failed\"" \ - "}" + "}" int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session, struct ast_json *message) @@ -196,3 +196,9 @@ void ari_handle_websocket(struct ast_websocket_server *ws_server, ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params, headers); } + +const char *ast_ari_websocket_session_id( + const struct ast_ari_websocket_session *session) +{ + return ast_websocket_session_id(session->ws_session); +} diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c index e666f2e05..f1342b7fa 100644 --- a/res/ari/resource_events.c +++ b/res/ari/resource_events.c @@ -27,226 +27,455 @@ ASTERISK_REGISTER_FILE() +#include "resource_events.h" #include "asterisk/astobj2.h" #include "asterisk/stasis_app.h" -#include "resource_events.h" +#include "asterisk/vector.h" -/*! Number of buckets for the Stasis application hash table. Remember to keep it - * a prime number! - */ +/*! Number of buckets for the event session registry. Remember to keep it a prime number! */ +#define EVENT_SESSION_NUM_BUCKETS 23 + +/*! Number of buckets for a websocket apps container. Remember to keep it a prime number! */ #define APPS_NUM_BUCKETS 7 -/*! \brief A connection to the event WebSocket */ +/*! Initial size of a message queue. */ +#define MESSAGES_INIT_SIZE 23 + + +/*! \brief A wrapper for the /ref ast_ari_websocket_session. */ struct event_session { - struct ast_ari_websocket_session *ws_session; - struct ao2_container *websocket_apps; + struct ast_ari_websocket_session *ws_session; /*!< Handle to the websocket session. */ + struct ao2_container *websocket_apps; /*!< List of Stasis apps registered to + the websocket session. */ + AST_VECTOR(, struct ast_json *) message_queue; /*!< Container for holding delayed messages. */ + char session_id[]; /*!< The id for the websocket session. */ }; +/*! \brief \ref event_session error types. */ +enum event_session_error_type { + ERROR_TYPE_STASIS_REGISTRATION = 1, /*!< Stasis failed to register the application. */ + ERROR_TYPE_OOM = 2, /*!< Insufficient memory to create the event + session. */ + ERROR_TYPE_MISSING_APP_PARAM = 3, /*!< HTTP request was missing an [app] parameter. */ + ERROR_TYPE_INVALID_APP_PARAM = 4, /*!< HTTP request contained an invalid [app] + parameter. */ +}; + +/*! \brief Local registry for created \ref event_session objects. */ +static struct ao2_container *event_session_registry; + /*! - * \brief Explicitly shutdown a session. + * \brief Callback handler for Stasis application messages. * - * An explicit shutdown is necessary, since stasis-app has a reference to this - * session. We also need to be sure to null out the \c ws_session field, since - * the websocket is about to go away. + * \internal * - * \param session Session info struct. + * \param data Void pointer to the event session (\ref event_session). + * \param app_name Name of the Stasis application that dispatched the message. + * \param message The dispatched message. */ -static void session_shutdown(struct event_session *session) +static void stasis_app_message_handler( + void *data, const char *app_name, struct ast_json *message) { - struct ao2_iterator i; - char *app; - SCOPED_AO2LOCK(lock, session); + struct event_session *session = data; + const char *msg_type, *msg_application; + + ast_assert(session != NULL); + + ao2_lock(session); + + msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), ""); + msg_application = S_OR( + ast_json_string_get(ast_json_object_get(message, "application")), ""); - i = ao2_iterator_init(session->websocket_apps, 0); - while ((app = ao2_iterator_next(&i))) { - stasis_app_unregister(app); - ao2_cleanup(app); + /* If we've been replaced, remove the application from our local + websocket_apps container */ + if (strcmp(msg_type, "ApplicationReplaced") == 0 && + strcmp(msg_application, app_name) == 0) { + ao2_find(session->websocket_apps, msg_application, + OBJ_UNLINK | OBJ_NODATA); } - ao2_iterator_destroy(&i); - ao2_cleanup(session->websocket_apps); - session->websocket_apps = NULL; - session->ws_session = NULL; + /* Now, we need to determine our state to see how we will handle the message */ + if (ast_json_object_set(message, "application", ast_json_string_create(app_name))) { + /* We failed to add an application element to our json message */ + ast_log(LOG_WARNING, + "Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n", + msg_type, + msg_application); + } else if (!session->ws_session) { + /* If the websocket is NULL, the message goes to the queue */ + AST_VECTOR_APPEND(&session->message_queue, message); + ast_log(LOG_WARNING, + "Queued '%s' message for Stasis app '%s'; websocket is not ready\n", + msg_type, + msg_application); + } else { + /* We are ready to publish the message */ + ast_ari_websocket_session_write(session->ws_session, message); + } + + ao2_unlock(session); } -static void session_dtor(void *obj) +/*! + * \brief AO2 comparison function for \ref event_session objects. + * + * \internal + * + * \param obj Void pointer to the \ref event_session container. + * \param arg Void pointer to the \ref event_session object. + * \param flags The \ref search_flags to use when creating the hash key. + * + * \retval 0 The objects are not equal. + * \retval CMP_MATCH The objects are equal. + */ +static int event_session_compare(void *obj, void *arg, int flags) { -#ifdef AST_DEVMODE /* Avoid unused variable warning */ - struct event_session *session = obj; -#endif + const struct event_session *object_left = obj; + const struct event_session *object_right = arg; + const char *right_key = arg; + int cmp = 0; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->session_id; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcmp(object_left->session_id, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + cmp = strncmp(object_left->session_id, right_key, strlen(right_key)); + break; + default: + break; + } - /* session_shutdown should have been called before */ - ast_assert(session->ws_session == NULL); - ast_assert(session->websocket_apps == NULL); + return cmp ? 0 : CMP_MATCH; +} + +/*! + * \brief AO2 hash function for \ref event_session objects. + * + * \details Computes hash value for the given \ref event_session, with respect to the + * provided search flags. + * + * \internal + * + * \param obj Void pointer to the \ref event_session object. + * \param flags The \ref search_flags to use when creating the hash key. + * + * \retval > 0 on success + * \retval 0 on failure + */ +static int event_session_hash(const void *obj, const int flags) +{ + const struct event_session *session; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + session = obj; + key = session->session_id; + break; + default: + /* Hash can only work on something with a full key. */ + ast_assert(0); + return 0; + } + return ast_str_hash(key); } -static void session_cleanup(struct event_session *session) +/*! + * \brief Explicitly shutdown a session. + * + * \details An explicit shutdown is necessary, since the \ref stasis_app has a reference + * to this session. We also need to be sure to null out the \c ws_session field, + * since the websocket is about to go away. + * + * \internal + * + * \param session Event session object (\ref event_session). + */ +static void event_session_shutdown(struct event_session *session) { - session_shutdown(session); - ao2_cleanup(session); + struct ao2_iterator i; + char *app; + int j; + SCOPED_AO2LOCK(lock, session); + + /* Clean up the websocket_apps container */ + if (session->websocket_apps) { + i = ao2_iterator_init(session->websocket_apps, 0); + while ((app = ao2_iterator_next(&i))) { + stasis_app_unregister(app); + ao2_cleanup(app); + } + ao2_iterator_destroy(&i); + ao2_cleanup(session->websocket_apps); + session->websocket_apps = NULL; + } + + /* Clean up the message_queue container */ + for (j = 0; j < AST_VECTOR_SIZE(&session->message_queue); j++) { + struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, j); + ast_json_unref(msg); + } + AST_VECTOR_FREE(&session->message_queue); + + /* Remove the handle to the underlying websocket session */ + session->ws_session = NULL; } -static struct event_session *session_create( - struct ast_ari_websocket_session *ws_session) +/*! + * \brief Updates the websocket session for an \ref event_session. + * + * \details The websocket for the given \ref event_session will be updated to the value + * of the \c ws_session argument. + * + * If the value of the \c ws_session is not \c NULL and there are messages in the + * event session's \c message_queue, the messages are dispatched and removed from + * the queue. + * + * \internal + * + * \param session The event session object to update (\ref event_session). + * \param ws_session Handle to the underlying websocket session + * (\ref ast_ari_websocket_session). + */ +static void event_session_update_websocket( + struct event_session *session, struct ast_ari_websocket_session *ws_session) { - RAII_VAR(struct event_session *, session, NULL, ao2_cleanup); + int i; + + ast_assert(session != NULL); - session = ao2_alloc(sizeof(*session), session_dtor); + ao2_lock(session); session->ws_session = ws_session; - session->websocket_apps = - ast_str_container_alloc(APPS_NUM_BUCKETS); - if (!session->websocket_apps) { - return NULL; + for (i = 0; i < AST_VECTOR_SIZE(&session->message_queue); i++) { + struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, i); + ast_ari_websocket_session_write(session->ws_session, msg); + ast_json_unref(msg); } - ao2_ref(session, +1); - return session; + AST_VECTOR_RESET(&session->message_queue, AST_VECTOR_ELEM_CLEANUP_NOOP); + ao2_unlock(session); } /*! - * \brief Callback handler for Stasis application messages. + * \brief Processes cleanup actions for a \ref event_session object. + * + * \internal + * + * \param session The event session object to cleanup (\ref event_session). */ -static void app_handler(void *data, const char *app_name, - struct ast_json *message) +static void event_session_cleanup(struct event_session *session) { - struct event_session *session = data; - int res; - const char *msg_type = S_OR( - ast_json_string_get(ast_json_object_get(message, "type")), - ""); - const char *msg_application = S_OR( - ast_json_string_get(ast_json_object_get(message, "application")), - ""); - if (!session) { return; } - - /* Determine if we've been replaced */ - if (strcmp(msg_type, "ApplicationReplaced") == 0 && - strcmp(msg_application, app_name) == 0) { - ao2_find(session->websocket_apps, msg_application, - OBJ_UNLINK | OBJ_NODATA); - } - res = ast_json_object_set(message, "application", - ast_json_string_create(app_name)); - if(res != 0) { - return; - } + event_session_shutdown(session); + ao2_unlink(event_session_registry, session); +} - ao2_lock(session); - if (session->ws_session) { - ast_ari_websocket_session_write(session->ws_session, message); - } - ao2_unlock(session); +/*! + * \brief Event session object destructor (\ref event_session). + * + * \internal + * + * \param obj Void pointer to the \ref event_session object. + */ +static void event_session_dtor(void *obj) +{ +#ifdef AST_DEVMODE /* Avoid unused variable warning */ + struct event_session *session = obj; + +#endif + + /* event_session_shutdown should have been called before now */ + ast_assert(session->ws_session == NULL); + ast_assert(session->websocket_apps == NULL); + ast_assert(AST_VECTOR_SIZE(&session->message_queue) == 0); } /*! - * \brief Register for all of the apps given. - * \param session Session info struct. - * \param app_name Name of application to register. + * \brief Handles \ref event_session error processing. + * + * \internal + * + * \param session The \ref event_session object. + * \param error The \ref event_session_error_type to handle. + * \param ser HTTP TCP/TLS Server Session (\ref ast_tcptls_session_instance). + * + * \retval -1 Always returns -1. */ -static int session_register_app(struct event_session *session, - const char *app_name) +static int event_session_allocation_error_handler( + struct event_session *session, enum event_session_error_type error, + struct ast_tcptls_session_instance *ser) { - SCOPED_AO2LOCK(lock, session); + /* Notify the client */ + switch (error) { + case ERROR_TYPE_STASIS_REGISTRATION: + ast_http_error(ser, 500, "Internal Server Error", + "Stasis registration failed"); + break; - ast_assert(session->ws_session != NULL); - ast_assert(session->websocket_apps != NULL); + case ERROR_TYPE_OOM: + ast_http_error(ser, 500, "Internal Server Error", + "Allocation failed"); + break; - if (ast_strlen_zero(app_name)) { - return -1; - } + case ERROR_TYPE_MISSING_APP_PARAM: + ast_http_error(ser, 400, "Bad Request", + "HTTP request is missing param: [app]"); + break; - if (ast_str_container_add(session->websocket_apps, app_name)) { - ast_ari_websocket_session_write(session->ws_session, - ast_ari_oom_json()); - return -1; - } + case ERROR_TYPE_INVALID_APP_PARAM: + ast_http_error(ser, 400, "Bad Request", + "Invalid application provided in param [app]."); + break; - stasis_app_register(app_name, app_handler, session); + default: + break; + } - return 0; + /* Cleanup the session */ + event_session_cleanup(session); + return -1; } -int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser, - struct ast_variable *headers, - struct ast_ari_events_event_websocket_args *args) +/*! + * \brief Creates an \ref event_session object and registers its apps with Stasis. + * + * \internal + * + * \param ser HTTP TCP/TLS Server Session (\ref ast_tcptls_session_instance). + * \param args The Stasis [app] parameters as parsed from the HTTP request + * (\ref ast_ari_events_event_websocket_args). + * \param session_id The id for the websocket session that will be created for this + * event session. + * + * \retval 0 on success + * \retval -1 on failure + */ +static int event_session_alloc(struct ast_tcptls_session_instance *ser, + struct ast_ari_events_event_websocket_args *args, const char *session_id) { - int res = 0; - size_t i, j; - - ast_debug(3, "/events WebSocket attempted\n"); + RAII_VAR(struct event_session *, session, NULL, ao2_cleanup); + size_t size, i; + /* The request must have at least one [app] parameter */ if (args->app_count == 0) { - ast_http_error(ser, 400, "Bad Request", "Missing param 'app'"); - return -1; + return event_session_allocation_error_handler( + session, ERROR_TYPE_MISSING_APP_PARAM, ser); + } + + size = sizeof(*session) + strlen(session_id) + 1; + + /* Instantiate the event session */ + session = ao2_alloc(size, event_session_dtor); + if (!session) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); } + strncpy(session->session_id, session_id, size - sizeof(*session)); + + /* Instantiate the hash table for Stasis apps */ + session->websocket_apps = + ast_str_container_alloc(APPS_NUM_BUCKETS); + + if (!session->websocket_apps) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } + + /* Instantiate the message queue */ + if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } + + /* Register the apps with Stasis */ for (i = 0; i < args->app_count; ++i) { - if (ast_strlen_zero(args->app[i])) { - res = -1; - break; + const char *app = args->app[i]; + + if (ast_strlen_zero(app)) { + return event_session_allocation_error_handler( + session, ERROR_TYPE_INVALID_APP_PARAM, ser); } - res |= stasis_app_register(args->app[i], app_handler, NULL); - } + if (ast_str_container_add(session->websocket_apps, app)) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } - if (res) { - for (j = 0; j < i; ++j) { - stasis_app_unregister(args->app[j]); + if (stasis_app_register(app, stasis_app_message_handler, session)) { + ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app); + return event_session_allocation_error_handler( + session, ERROR_TYPE_STASIS_REGISTRATION, ser); } - ast_http_error(ser, 400, "Bad Request", "Invalid application provided in param 'app'."); } - return res; + /* Add the event session to the local registry */ + if (!ao2_link(event_session_registry, session)) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } + + return 0; } -void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *ws_session, - struct ast_variable *headers, - struct ast_ari_events_event_websocket_args *args) +int ast_ari_websocket_events_event_websocket_init(void) { - RAII_VAR(struct event_session *, session, NULL, session_cleanup); - struct ast_json *msg; - int res; - size_t i; + /* Try to instantiate the registry */ + event_session_registry = ao2_container_alloc(EVENT_SESSION_NUM_BUCKETS, + event_session_hash, + event_session_compare); + + if (!event_session_registry) { + /* This is bad, bad. */ + ast_log(LOG_WARNING, + "Failed to allocate the local registry for websocket applications\n"); + return -1; + } - ast_debug(3, "/events WebSocket connection\n"); + return 0; +} - session = session_create(ws_session); - if (!session) { - ast_ari_websocket_session_write(ws_session, ast_ari_oom_json()); - return; - } +int ast_ari_websocket_events_event_websocket_attempted( + struct ast_tcptls_session_instance *ser, struct ast_variable *headers, + struct ast_ari_events_event_websocket_args *args, const char *session_id) +{ + ast_debug(3, "/events WebSocket attempted\n"); - res = 0; - for (i = 0; i < args->app_count; ++i) { - if (ast_strlen_zero(args->app[i])) { - continue; - } - res |= session_register_app(session, args->app[i]); - } + /* Create the event session */ + return event_session_alloc(ser, args, session_id); +} - if (ao2_container_count(session->websocket_apps) == 0) { - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); +void ast_ari_websocket_events_event_websocket_established( + struct ast_ari_websocket_session *ws_session, struct ast_variable *headers, + struct ast_ari_events_event_websocket_args *args) +{ + RAII_VAR(struct event_session *, session, NULL, event_session_cleanup); + struct ast_json *msg; + const char *session_id; - msg = ast_json_pack("{s: s, s: [s]}", - "type", "MissingParams", - "params", "app"); - if (!msg) { - msg = ast_json_ref(ast_ari_oom_json()); - } + ast_debug(3, "/events WebSocket established\n"); - ast_ari_websocket_session_write(session->ws_session, msg); - return; - } + ast_assert(ws_session != NULL); - if (res != 0) { - ast_ari_websocket_session_write(ws_session, ast_ari_oom_json()); - return; + session_id = ast_ari_websocket_session_id(ws_session); + + /* Find the event_session and update its websocket */ + session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY); + + if (session) { + ao2_unlink(event_session_registry, session); + event_session_update_websocket(session, ws_session); + } else { + ast_log(LOG_WARNING, + "Failed to locate an event session for the provided websocket session\n"); } /* We don't process any input, but we'll consume it waiting for EOF */ @@ -309,4 +538,3 @@ void ast_ari_events_user_event(struct ast_variable *headers, "Error processing request"); } } - diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h index 2b631819b..bc763ebd3 100644 --- a/res/ari/resource_events.h +++ b/res/ari/resource_events.h @@ -52,14 +52,24 @@ struct ast_ari_events_event_websocket_args { /*! * \brief WebSocket connection for events. * + * \retval 0 success + * \retval -1 error + */ +int ast_ari_websocket_events_event_websocket_init(void); + +/*! + * \brief WebSocket connection for events. + * * \param ser HTTP TCP/TLS Server Session * \param headers HTTP headers * \param args Swagger parameters + * \param session_id The id of the current session. * * \retval 0 success * \retval non-zero error */ -int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args); +int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser, + struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args, const char *session_id); /*! * \brief WebSocket connection for events. @@ -67,8 +77,10 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session * \param session ARI WebSocket. * \param headers HTTP headers. * \param args Swagger parameters. + * \param session_id The id of the current session. */ -void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args); +void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *session, + struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args); /*! Argument struct for ast_ari_events_user_event() */ struct ast_ari_events_user_event_args { /*! Event name */ |