summaryrefslogtreecommitdiff
path: root/res/ari/resource_events.c
diff options
context:
space:
mode:
authorMatt Jordan <mjordan@digium.com>2015-09-04 12:25:07 -0500
committerMatt Jordan <mjordan@digium.com>2015-09-22 09:59:47 -0500
commitb99a7052621700a1aa641a1c24308f5873275fc8 (patch)
treee54d4410b334fb3059d8240d1c188434af56a8df /res/ari/resource_events.c
parent47813cc51c3eae674482490e9b5bd5fcc4780fa5 (diff)
ARI: Add the ability to subscribe to all events
This patch adds the ability to subscribe to all events. There are two possible ways to accomplish this: (1) On initial WebSocket connection. This patch adds a new query parameter, 'subscribeAll'. If present and True, Asterisk will subscribe the applications to all ARI events. (2) Via the applications resource. When subscribing in this manner, an ARI client should merely specify a blank resource name, i.e., 'channels:' instead of 'channels:12354'. This will subscribe the application to all resources of the 'channels' type. ASTERISK-24870 #close Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6
Diffstat (limited to 'res/ari/resource_events.c')
-rw-r--r--res/ari/resource_events.c31
1 files changed, 26 insertions, 5 deletions
diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c
index deb7f9cc0..8fa15f5aa 100644
--- a/res/ari/resource_events.c
+++ b/res/ari/resource_events.c
@@ -280,7 +280,9 @@ static void event_session_cleanup(struct event_session *session)
}
event_session_shutdown(session);
- ao2_unlink(event_session_registry, session);
+ if (event_session_registry) {
+ ao2_unlink(event_session_registry, session);
+ }
}
/*!
@@ -367,6 +369,7 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
struct ast_ari_events_event_websocket_args *args, const char *session_id)
{
RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data);
size_t size, i;
/* The request must have at least one [app] parameter */
@@ -399,6 +402,12 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
}
/* Register the apps with Stasis */
+ if (args->subscribe_all) {
+ register_handler = &stasis_app_register_all;
+ } else {
+ register_handler = &stasis_app_register;
+ }
+
for (i = 0; i < args->app_count; ++i) {
const char *app = args->app[i];
@@ -411,10 +420,10 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
- if (stasis_app_register(app, stasis_app_message_handler, session)) {
+ if (register_handler(app, stasis_app_message_handler, session)) {
ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
return event_session_allocation_error_handler(
- session, ERROR_TYPE_STASIS_REGISTRATION, ser);
+ session, ERROR_TYPE_STASIS_REGISTRATION, ser);
}
}
@@ -426,8 +435,17 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
return 0;
}
+static int event_session_shutdown_cb(void *session, void *arg, int flags)
+{
+ event_session_cleanup(session);
+
+ return 0;
+}
+
void ast_ari_websocket_events_event_websocket_dtor(void)
{
+ ao2_callback(event_session_registry, OBJ_MULTIPLE | OBJ_NODATA, event_session_shutdown_cb, NULL);
+
ao2_cleanup(event_session_registry);
event_session_registry = NULL;
}
@@ -462,7 +480,8 @@ void ast_ari_websocket_events_event_websocket_established(
struct ast_ari_websocket_session *ws_session, struct ast_variable *headers,
struct ast_ari_events_event_websocket_args *args)
{
- RAII_VAR(struct event_session *, session, NULL, event_session_cleanup);
+ struct event_session *session;
+
struct ast_json *msg;
const char *session_id;
@@ -474,7 +493,6 @@ void ast_ari_websocket_events_event_websocket_established(
/* Find the event_session and update its websocket */
session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY);
-
if (session) {
ao2_unlink(event_session_registry, session);
event_session_update_websocket(session, ws_session);
@@ -487,6 +505,9 @@ void ast_ari_websocket_events_event_websocket_established(
while ((msg = ast_ari_websocket_session_read(ws_session))) {
ast_json_unref(msg);
}
+
+ event_session_cleanup(session);
+ ao2_ref(session, -1);
}
void ast_ari_events_user_event(struct ast_variable *headers,