summaryrefslogtreecommitdiff
path: root/res/res_stasis_websocket.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_stasis_websocket.c')
-rw-r--r--res/res_stasis_websocket.c326
1 files changed, 326 insertions, 0 deletions
diff --git a/res/res_stasis_websocket.c b/res/res_stasis_websocket.c
new file mode 100644
index 000000000..b4819aec9
--- /dev/null
+++ b/res/res_stasis_websocket.c
@@ -0,0 +1,326 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2012 - 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief HTTP binding for the Stasis API
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <depend type="module">app_stasis</depend>
+ <depend type="module">res_http_websocket</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/app_stasis.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/http_websocket.h"
+#include "asterisk/json.h"
+#include "asterisk/module.h"
+#include "asterisk/strings.h"
+#include "asterisk/utils.h"
+
+/*! WebSocket protocol for Stasis */
+static const char * const ws_protocol = "stasis";
+
+/*! Message to send when out of memory */
+static struct ast_json *oom_json;
+
+/*! Number of buckets for the Stasis application hash table. Remember to keep it
+ * a prime number!
+ */
+#define APPS_NUM_BUCKETS 7
+
+struct websocket_app {
+ char *name;
+};
+
+/*!
+ * \internal
+ * \brief Helper to write a JSON object to a WebSocket.
+ * \param session WebSocket session.
+ * \param message JSON message.
+ * \return 0 on success.
+ * \return -1 on error.
+ */
+static int websocket_write_json(struct ast_websocket *session,
+ struct ast_json *message)
+{
+ RAII_VAR(char *, str, ast_json_dump_string(message), ast_free);
+
+ if (str == NULL) {
+ ast_log(LOG_ERROR, "Failed to encode JSON object\n");
+ return -1;
+ }
+
+ return ast_websocket_write(session, AST_WEBSOCKET_OPCODE_TEXT, str,
+ strlen(str));
+}
+
+/*! Hash function for websocket_app */
+static int hash_app(const void *obj, const int flags)
+{
+ const struct websocket_app *app = obj;
+ const char *name = flags & OBJ_KEY ? obj : app->name;
+
+ return ast_str_hash(name);
+}
+
+/*! Comparison function for websocket_app */
+static int compare_app(void *lhs, void *rhs, int flags)
+{
+ const struct websocket_app *lhs_app = lhs;
+ const struct websocket_app *rhs_app = rhs;
+ const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
+
+ if (strcmp(lhs_app->name, rhs_name) == 0) {
+ return CMP_MATCH;
+ } else {
+ return 0;
+ }
+}
+
+static void app_dtor(void *obj)
+{
+ struct websocket_app *app = obj;
+ ast_free(app->name);
+}
+
+struct stasis_ws_session_info {
+ struct ast_websocket *ws_session;
+ struct ao2_container *websocket_apps;
+};
+
+static void session_dtor(void *obj)
+{
+ struct stasis_ws_session_info *session = obj;
+
+ /* session_shutdown should have been called before */
+ ast_assert(session->ws_session == NULL);
+ ast_assert(session->websocket_apps == NULL);
+}
+
+static struct stasis_ws_session_info *session_create(
+ struct ast_websocket *ws_session)
+{
+ RAII_VAR(struct stasis_ws_session_info *, session, NULL, ao2_cleanup);
+
+ session = ao2_alloc(sizeof(*session), session_dtor);
+
+ session->ws_session = ws_session;
+ session->websocket_apps =
+ ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app);
+
+ if (!session->websocket_apps) {
+ return NULL;
+ }
+
+ ao2_ref(session, +1);
+ return session;
+}
+
+/*!
+ * \brief Explicitly shutdown a session.
+ *
+ * An explicit shutdown is necessary, since stasis-app has a reference to this
+ * session. We also need to be sure to null out the \c ws_session field, since
+ * the websocket is about to go away.
+ *
+ * \param session Session info struct.
+ */
+static void session_shutdown(struct stasis_ws_session_info *session)
+{
+ struct ao2_iterator i;
+ struct websocket_app *app;
+ SCOPED_AO2LOCK(lock, session);
+
+ i = ao2_iterator_init(session->websocket_apps, 0);
+ while ((app = ao2_iterator_next(&i))) {
+ stasis_app_unregister(app->name);
+ ao2_cleanup(app);
+ }
+ ao2_iterator_destroy(&i);
+ ao2_cleanup(session->websocket_apps);
+
+ session->websocket_apps = NULL;
+ session->ws_session = NULL;
+}
+
+/*!
+ * \brief Callback handler for Stasis application messages.
+ */
+static void app_handler(void *data, const char *app_name,
+ struct ast_json *message)
+{
+ struct stasis_ws_session_info *session = data;
+ int res;
+
+ res = ast_json_object_set(message, "application",
+ ast_json_string_create(app_name));
+ if(res != 0) {
+ return;
+ }
+
+ ao2_lock(session);
+ if (session->ws_session) {
+ websocket_write_json(session->ws_session, message);
+ }
+ ao2_unlock(session);
+}
+
+/*!
+ * \brief Register for all of the apps given.
+ * \param session Session info struct.
+ * \param app_list Comma seperated list of app names to register.
+ */
+static int session_register_apps(struct stasis_ws_session_info *session,
+ const char *app_list)
+{
+ RAII_VAR(char *, to_free, NULL, ast_free);
+ char *apps, *app_name;
+ SCOPED_AO2LOCK(lock, session);
+
+ ast_assert(session->ws_session != NULL);
+ ast_assert(session->websocket_apps != NULL);
+
+ to_free = apps = ast_strdup(app_list);
+ if (!apps) {
+ websocket_write_json(session->ws_session, oom_json);
+ return -1;
+ }
+ while ((app_name = strsep(&apps, ","))) {
+ RAII_VAR(struct websocket_app *, app, NULL, ao2_cleanup);
+
+ app = ao2_alloc(sizeof(*app), app_dtor);
+ if (!app) {
+ websocket_write_json(session->ws_session, oom_json);
+ return -1;
+ }
+ app->name = ast_strdup(app_name);
+ ao2_link(session->websocket_apps, app);
+
+ stasis_app_register(app_name, app_handler, session);
+ }
+ return 0;
+}
+
+static void websocket_callback(struct ast_websocket *ws_session,
+ struct ast_variable *parameters,
+ struct ast_variable *headers)
+{
+ RAII_VAR(struct stasis_ws_session_info *, stasis_session, NULL, ao2_cleanup);
+ struct ast_variable *param = NULL;
+ int res;
+
+ ast_debug(3, "Stasis web socket connection\n");
+
+ if (ast_websocket_set_nonblock(ws_session) != 0) {
+ ast_log(LOG_ERROR,
+ "Stasis web socket failed to set nonblock; closing\n");
+ goto end;
+ }
+
+ stasis_session = session_create(ws_session);
+
+ if (!stasis_session) {
+ websocket_write_json(ws_session, oom_json);
+ goto end;
+ }
+
+ for (param = parameters; param; param = param->next) {
+ if (strcmp(param->name, "app") == 0) {
+ int ret = session_register_apps(
+ stasis_session, param->value);
+ if (ret != 0) {
+ goto end;
+ }
+ }
+ }
+
+ if (ao2_container_count(stasis_session->websocket_apps) == 0) {
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+
+ msg = ast_json_pack("{s: s, s: [s]}",
+ "error", "MissingParams",
+ "params", "app");
+ if (msg) {
+ websocket_write_json(ws_session, msg);
+ }
+
+ goto end;
+ }
+
+ while ((res = ast_wait_for_input(ast_websocket_fd(ws_session), -1)) > 0) {
+ char *payload;
+ uint64_t payload_len;
+ enum ast_websocket_opcode opcode;
+ int fragmented;
+ int read = ast_websocket_read(ws_session, &payload, &payload_len,
+ &opcode, &fragmented);
+
+ if (read) {
+ ast_log(LOG_ERROR,
+ "Stasis WebSocket read error; closing\n");
+ break;
+ }
+
+ if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
+ break;
+ }
+ }
+
+end:
+ session_shutdown(stasis_session);
+ ast_websocket_unref(ws_session);
+}
+
+static int load_module(void)
+{
+ int r = 0;
+
+ oom_json = ast_json_pack("{s: s}",
+ "error", "OutOfMemory");
+ if (!oom_json) {
+ /* ironic */
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ r |= ast_websocket_add_protocol(ws_protocol, websocket_callback);
+ return r;
+}
+
+static int unload_module(void)
+{
+ int r = 0;
+
+ ast_json_unref(oom_json);
+ oom_json = NULL;
+ r |= ast_websocket_remove_protocol(ws_protocol, websocket_callback);
+ return r;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis HTTP bindings",
+ .load = load_module,
+ .unload = unload_module,
+ .nonoptreq = "app_stasis,res_http_websocket"
+ );