diff options
author | David M. Lee <dlee@digium.com> | 2013-04-08 13:27:45 +0000 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-04-08 13:27:45 +0000 |
commit | a2a53cc306ea5fec65daf3630716a7c6ee13adad (patch) | |
tree | 4e59f10e2c6ab044ac307466bf921bbf1ceca7d3 /res | |
parent | 426095bc5503391eabb3e5ce0fbbfec8b4752f2d (diff) |
Stasis application WebSocket support
This is the API that binds the Stasis dialplan application to external
Stasis applications. It also adds the beginnings of WebSocket
application support.
This module registers a dialplan function named Stasis, which is used
to put a channel into the named Stasis app. As a channel enters and
leaves the Stasis diaplan application, the Stasis app receives a
'stasis-start' and 'stasis-end' events.
Stasis apps register themselves using the stasis_app_register and
stasis_app_unregister functions. Messages are sent to an application
using stasis_app_send.
Finally, Stasis apps control channels through the use of the
stasis_app_control object, and the family of stasis_app_control_*
functions.
Other changes along for the ride are:
* An ast_frame_dtor function that's RAII_VAR safe
* Some common JSON encoders for name/number, timeval, and
context/extension/priority
Review: https://reviewboard.asterisk.org/r/2361/
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384879 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res')
-rw-r--r-- | res/res_stasis_websocket.c | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/res/res_stasis_websocket.c b/res/res_stasis_websocket.c new file mode 100644 index 000000000..b4819aec9 --- /dev/null +++ b/res/res_stasis_websocket.c @@ -0,0 +1,326 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012 - 2013, Digium, Inc. + * + * David M. Lee, II <dlee@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief HTTP binding for the Stasis API + * + * \author David M. Lee, II <dlee@digium.com> + */ + +/*** MODULEINFO + <depend type="module">app_stasis</depend> + <depend type="module">res_http_websocket</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/app_stasis.h" +#include "asterisk/astobj2.h" +#include "asterisk/http_websocket.h" +#include "asterisk/json.h" +#include "asterisk/module.h" +#include "asterisk/strings.h" +#include "asterisk/utils.h" + +/*! WebSocket protocol for Stasis */ +static const char * const ws_protocol = "stasis"; + +/*! Message to send when out of memory */ +static struct ast_json *oom_json; + +/*! Number of buckets for the Stasis application hash table. Remember to keep it + * a prime number! + */ +#define APPS_NUM_BUCKETS 7 + +struct websocket_app { + char *name; +}; + +/*! + * \internal + * \brief Helper to write a JSON object to a WebSocket. + * \param session WebSocket session. + * \param message JSON message. + * \return 0 on success. + * \return -1 on error. + */ +static int websocket_write_json(struct ast_websocket *session, + struct ast_json *message) +{ + RAII_VAR(char *, str, ast_json_dump_string(message), ast_free); + + if (str == NULL) { + ast_log(LOG_ERROR, "Failed to encode JSON object\n"); + return -1; + } + + return ast_websocket_write(session, AST_WEBSOCKET_OPCODE_TEXT, str, + strlen(str)); +} + +/*! Hash function for websocket_app */ +static int hash_app(const void *obj, const int flags) +{ + const struct websocket_app *app = obj; + const char *name = flags & OBJ_KEY ? obj : app->name; + + return ast_str_hash(name); +} + +/*! Comparison function for websocket_app */ +static int compare_app(void *lhs, void *rhs, int flags) +{ + const struct websocket_app *lhs_app = lhs; + const struct websocket_app *rhs_app = rhs; + const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name; + + if (strcmp(lhs_app->name, rhs_name) == 0) { + return CMP_MATCH; + } else { + return 0; + } +} + +static void app_dtor(void *obj) +{ + struct websocket_app *app = obj; + ast_free(app->name); +} + +struct stasis_ws_session_info { + struct ast_websocket *ws_session; + struct ao2_container *websocket_apps; +}; + +static void session_dtor(void *obj) +{ + struct stasis_ws_session_info *session = obj; + + /* session_shutdown should have been called before */ + ast_assert(session->ws_session == NULL); + ast_assert(session->websocket_apps == NULL); +} + +static struct stasis_ws_session_info *session_create( + struct ast_websocket *ws_session) +{ + RAII_VAR(struct stasis_ws_session_info *, session, NULL, ao2_cleanup); + + session = ao2_alloc(sizeof(*session), session_dtor); + + session->ws_session = ws_session; + session->websocket_apps = + ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app); + + if (!session->websocket_apps) { + return NULL; + } + + ao2_ref(session, +1); + return session; +} + +/*! + * \brief Explicitly shutdown a session. + * + * An explicit shutdown is necessary, since stasis-app has a reference to this + * session. We also need to be sure to null out the \c ws_session field, since + * the websocket is about to go away. + * + * \param session Session info struct. + */ +static void session_shutdown(struct stasis_ws_session_info *session) +{ + struct ao2_iterator i; + struct websocket_app *app; + SCOPED_AO2LOCK(lock, session); + + i = ao2_iterator_init(session->websocket_apps, 0); + while ((app = ao2_iterator_next(&i))) { + stasis_app_unregister(app->name); + ao2_cleanup(app); + } + ao2_iterator_destroy(&i); + ao2_cleanup(session->websocket_apps); + + session->websocket_apps = NULL; + session->ws_session = NULL; +} + +/*! + * \brief Callback handler for Stasis application messages. + */ +static void app_handler(void *data, const char *app_name, + struct ast_json *message) +{ + struct stasis_ws_session_info *session = data; + int res; + + res = ast_json_object_set(message, "application", + ast_json_string_create(app_name)); + if(res != 0) { + return; + } + + ao2_lock(session); + if (session->ws_session) { + websocket_write_json(session->ws_session, message); + } + ao2_unlock(session); +} + +/*! + * \brief Register for all of the apps given. + * \param session Session info struct. + * \param app_list Comma seperated list of app names to register. + */ +static int session_register_apps(struct stasis_ws_session_info *session, + const char *app_list) +{ + RAII_VAR(char *, to_free, NULL, ast_free); + char *apps, *app_name; + SCOPED_AO2LOCK(lock, session); + + ast_assert(session->ws_session != NULL); + ast_assert(session->websocket_apps != NULL); + + to_free = apps = ast_strdup(app_list); + if (!apps) { + websocket_write_json(session->ws_session, oom_json); + return -1; + } + while ((app_name = strsep(&apps, ","))) { + RAII_VAR(struct websocket_app *, app, NULL, ao2_cleanup); + + app = ao2_alloc(sizeof(*app), app_dtor); + if (!app) { + websocket_write_json(session->ws_session, oom_json); + return -1; + } + app->name = ast_strdup(app_name); + ao2_link(session->websocket_apps, app); + + stasis_app_register(app_name, app_handler, session); + } + return 0; +} + +static void websocket_callback(struct ast_websocket *ws_session, + struct ast_variable *parameters, + struct ast_variable *headers) +{ + RAII_VAR(struct stasis_ws_session_info *, stasis_session, NULL, ao2_cleanup); + struct ast_variable *param = NULL; + int res; + + ast_debug(3, "Stasis web socket connection\n"); + + if (ast_websocket_set_nonblock(ws_session) != 0) { + ast_log(LOG_ERROR, + "Stasis web socket failed to set nonblock; closing\n"); + goto end; + } + + stasis_session = session_create(ws_session); + + if (!stasis_session) { + websocket_write_json(ws_session, oom_json); + goto end; + } + + for (param = parameters; param; param = param->next) { + if (strcmp(param->name, "app") == 0) { + int ret = session_register_apps( + stasis_session, param->value); + if (ret != 0) { + goto end; + } + } + } + + if (ao2_container_count(stasis_session->websocket_apps) == 0) { + RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); + + msg = ast_json_pack("{s: s, s: [s]}", + "error", "MissingParams", + "params", "app"); + if (msg) { + websocket_write_json(ws_session, msg); + } + + goto end; + } + + while ((res = ast_wait_for_input(ast_websocket_fd(ws_session), -1)) > 0) { + char *payload; + uint64_t payload_len; + enum ast_websocket_opcode opcode; + int fragmented; + int read = ast_websocket_read(ws_session, &payload, &payload_len, + &opcode, &fragmented); + + if (read) { + ast_log(LOG_ERROR, + "Stasis WebSocket read error; closing\n"); + break; + } + + if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) { + break; + } + } + +end: + session_shutdown(stasis_session); + ast_websocket_unref(ws_session); +} + +static int load_module(void) +{ + int r = 0; + + oom_json = ast_json_pack("{s: s}", + "error", "OutOfMemory"); + if (!oom_json) { + /* ironic */ + return AST_MODULE_LOAD_FAILURE; + } + r |= ast_websocket_add_protocol(ws_protocol, websocket_callback); + return r; +} + +static int unload_module(void) +{ + int r = 0; + + ast_json_unref(oom_json); + oom_json = NULL; + r |= ast_websocket_remove_protocol(ws_protocol, websocket_callback); + return r; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis HTTP bindings", + .load = load_module, + .unload = unload_module, + .nonoptreq = "app_stasis,res_http_websocket" + ); |