summaryrefslogtreecommitdiff
path: root/res
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-04-08 13:27:45 +0000
committerDavid M. Lee <dlee@digium.com>2013-04-08 13:27:45 +0000
commita2a53cc306ea5fec65daf3630716a7c6ee13adad (patch)
tree4e59f10e2c6ab044ac307466bf921bbf1ceca7d3 /res
parent426095bc5503391eabb3e5ce0fbbfec8b4752f2d (diff)
Stasis application WebSocket support
This is the API that binds the Stasis dialplan application to external Stasis applications. It also adds the beginnings of WebSocket application support. This module registers a dialplan function named Stasis, which is used to put a channel into the named Stasis app. As a channel enters and leaves the Stasis diaplan application, the Stasis app receives a 'stasis-start' and 'stasis-end' events. Stasis apps register themselves using the stasis_app_register and stasis_app_unregister functions. Messages are sent to an application using stasis_app_send. Finally, Stasis apps control channels through the use of the stasis_app_control object, and the family of stasis_app_control_* functions. Other changes along for the ride are: * An ast_frame_dtor function that's RAII_VAR safe * Some common JSON encoders for name/number, timeval, and context/extension/priority Review: https://reviewboard.asterisk.org/r/2361/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384879 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res')
-rw-r--r--res/res_stasis_websocket.c326
1 files changed, 326 insertions, 0 deletions
diff --git a/res/res_stasis_websocket.c b/res/res_stasis_websocket.c
new file mode 100644
index 000000000..b4819aec9
--- /dev/null
+++ b/res/res_stasis_websocket.c
@@ -0,0 +1,326 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2012 - 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief HTTP binding for the Stasis API
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+ <depend type="module">app_stasis</depend>
+ <depend type="module">res_http_websocket</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/app_stasis.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/http_websocket.h"
+#include "asterisk/json.h"
+#include "asterisk/module.h"
+#include "asterisk/strings.h"
+#include "asterisk/utils.h"
+
+/*! WebSocket protocol for Stasis */
+static const char * const ws_protocol = "stasis";
+
+/*! Message to send when out of memory */
+static struct ast_json *oom_json;
+
+/*! Number of buckets for the Stasis application hash table. Remember to keep it
+ * a prime number!
+ */
+#define APPS_NUM_BUCKETS 7
+
+struct websocket_app {
+ char *name;
+};
+
+/*!
+ * \internal
+ * \brief Helper to write a JSON object to a WebSocket.
+ * \param session WebSocket session.
+ * \param message JSON message.
+ * \return 0 on success.
+ * \return -1 on error.
+ */
+static int websocket_write_json(struct ast_websocket *session,
+ struct ast_json *message)
+{
+ RAII_VAR(char *, str, ast_json_dump_string(message), ast_free);
+
+ if (str == NULL) {
+ ast_log(LOG_ERROR, "Failed to encode JSON object\n");
+ return -1;
+ }
+
+ return ast_websocket_write(session, AST_WEBSOCKET_OPCODE_TEXT, str,
+ strlen(str));
+}
+
+/*! Hash function for websocket_app */
+static int hash_app(const void *obj, const int flags)
+{
+ const struct websocket_app *app = obj;
+ const char *name = flags & OBJ_KEY ? obj : app->name;
+
+ return ast_str_hash(name);
+}
+
+/*! Comparison function for websocket_app */
+static int compare_app(void *lhs, void *rhs, int flags)
+{
+ const struct websocket_app *lhs_app = lhs;
+ const struct websocket_app *rhs_app = rhs;
+ const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
+
+ if (strcmp(lhs_app->name, rhs_name) == 0) {
+ return CMP_MATCH;
+ } else {
+ return 0;
+ }
+}
+
+static void app_dtor(void *obj)
+{
+ struct websocket_app *app = obj;
+ ast_free(app->name);
+}
+
+struct stasis_ws_session_info {
+ struct ast_websocket *ws_session;
+ struct ao2_container *websocket_apps;
+};
+
+static void session_dtor(void *obj)
+{
+ struct stasis_ws_session_info *session = obj;
+
+ /* session_shutdown should have been called before */
+ ast_assert(session->ws_session == NULL);
+ ast_assert(session->websocket_apps == NULL);
+}
+
+static struct stasis_ws_session_info *session_create(
+ struct ast_websocket *ws_session)
+{
+ RAII_VAR(struct stasis_ws_session_info *, session, NULL, ao2_cleanup);
+
+ session = ao2_alloc(sizeof(*session), session_dtor);
+
+ session->ws_session = ws_session;
+ session->websocket_apps =
+ ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app);
+
+ if (!session->websocket_apps) {
+ return NULL;
+ }
+
+ ao2_ref(session, +1);
+ return session;
+}
+
+/*!
+ * \brief Explicitly shutdown a session.
+ *
+ * An explicit shutdown is necessary, since stasis-app has a reference to this
+ * session. We also need to be sure to null out the \c ws_session field, since
+ * the websocket is about to go away.
+ *
+ * \param session Session info struct.
+ */
+static void session_shutdown(struct stasis_ws_session_info *session)
+{
+ struct ao2_iterator i;
+ struct websocket_app *app;
+ SCOPED_AO2LOCK(lock, session);
+
+ i = ao2_iterator_init(session->websocket_apps, 0);
+ while ((app = ao2_iterator_next(&i))) {
+ stasis_app_unregister(app->name);
+ ao2_cleanup(app);
+ }
+ ao2_iterator_destroy(&i);
+ ao2_cleanup(session->websocket_apps);
+
+ session->websocket_apps = NULL;
+ session->ws_session = NULL;
+}
+
+/*!
+ * \brief Callback handler for Stasis application messages.
+ */
+static void app_handler(void *data, const char *app_name,
+ struct ast_json *message)
+{
+ struct stasis_ws_session_info *session = data;
+ int res;
+
+ res = ast_json_object_set(message, "application",
+ ast_json_string_create(app_name));
+ if(res != 0) {
+ return;
+ }
+
+ ao2_lock(session);
+ if (session->ws_session) {
+ websocket_write_json(session->ws_session, message);
+ }
+ ao2_unlock(session);
+}
+
+/*!
+ * \brief Register for all of the apps given.
+ * \param session Session info struct.
+ * \param app_list Comma seperated list of app names to register.
+ */
+static int session_register_apps(struct stasis_ws_session_info *session,
+ const char *app_list)
+{
+ RAII_VAR(char *, to_free, NULL, ast_free);
+ char *apps, *app_name;
+ SCOPED_AO2LOCK(lock, session);
+
+ ast_assert(session->ws_session != NULL);
+ ast_assert(session->websocket_apps != NULL);
+
+ to_free = apps = ast_strdup(app_list);
+ if (!apps) {
+ websocket_write_json(session->ws_session, oom_json);
+ return -1;
+ }
+ while ((app_name = strsep(&apps, ","))) {
+ RAII_VAR(struct websocket_app *, app, NULL, ao2_cleanup);
+
+ app = ao2_alloc(sizeof(*app), app_dtor);
+ if (!app) {
+ websocket_write_json(session->ws_session, oom_json);
+ return -1;
+ }
+ app->name = ast_strdup(app_name);
+ ao2_link(session->websocket_apps, app);
+
+ stasis_app_register(app_name, app_handler, session);
+ }
+ return 0;
+}
+
+static void websocket_callback(struct ast_websocket *ws_session,
+ struct ast_variable *parameters,
+ struct ast_variable *headers)
+{
+ RAII_VAR(struct stasis_ws_session_info *, stasis_session, NULL, ao2_cleanup);
+ struct ast_variable *param = NULL;
+ int res;
+
+ ast_debug(3, "Stasis web socket connection\n");
+
+ if (ast_websocket_set_nonblock(ws_session) != 0) {
+ ast_log(LOG_ERROR,
+ "Stasis web socket failed to set nonblock; closing\n");
+ goto end;
+ }
+
+ stasis_session = session_create(ws_session);
+
+ if (!stasis_session) {
+ websocket_write_json(ws_session, oom_json);
+ goto end;
+ }
+
+ for (param = parameters; param; param = param->next) {
+ if (strcmp(param->name, "app") == 0) {
+ int ret = session_register_apps(
+ stasis_session, param->value);
+ if (ret != 0) {
+ goto end;
+ }
+ }
+ }
+
+ if (ao2_container_count(stasis_session->websocket_apps) == 0) {
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+
+ msg = ast_json_pack("{s: s, s: [s]}",
+ "error", "MissingParams",
+ "params", "app");
+ if (msg) {
+ websocket_write_json(ws_session, msg);
+ }
+
+ goto end;
+ }
+
+ while ((res = ast_wait_for_input(ast_websocket_fd(ws_session), -1)) > 0) {
+ char *payload;
+ uint64_t payload_len;
+ enum ast_websocket_opcode opcode;
+ int fragmented;
+ int read = ast_websocket_read(ws_session, &payload, &payload_len,
+ &opcode, &fragmented);
+
+ if (read) {
+ ast_log(LOG_ERROR,
+ "Stasis WebSocket read error; closing\n");
+ break;
+ }
+
+ if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
+ break;
+ }
+ }
+
+end:
+ session_shutdown(stasis_session);
+ ast_websocket_unref(ws_session);
+}
+
+static int load_module(void)
+{
+ int r = 0;
+
+ oom_json = ast_json_pack("{s: s}",
+ "error", "OutOfMemory");
+ if (!oom_json) {
+ /* ironic */
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ r |= ast_websocket_add_protocol(ws_protocol, websocket_callback);
+ return r;
+}
+
+static int unload_module(void)
+{
+ int r = 0;
+
+ ast_json_unref(oom_json);
+ oom_json = NULL;
+ r |= ast_websocket_remove_protocol(ws_protocol, websocket_callback);
+ return r;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis HTTP bindings",
+ .load = load_module,
+ .unload = unload_module,
+ .nonoptreq = "app_stasis,res_http_websocket"
+ );