summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main/stasis_bridges.c54
-rw-r--r--res/res_ari_asterisk.c12
-rw-r--r--res/res_ari_bridges.c24
-rw-r--r--res/res_ari_events.c22
-rw-r--r--res/res_stasis.c607
-rw-r--r--res/stasis/app.c639
-rw-r--r--res/stasis/app.h89
-rw-r--r--rest-api-templates/param_parsing.mustache12
-rw-r--r--rest-api-templates/res_ari_resource.c.mustache10
9 files changed, 766 insertions, 703 deletions
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c
index 2a79056d1..be1294ad0 100644
--- a/main/stasis_bridges.c
+++ b/main/stasis_bridges.c
@@ -132,6 +132,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg);
static struct stasis_cp_all *bridge_cache_all;
@@ -139,9 +142,12 @@ static struct stasis_cp_all *bridge_cache_all;
* @{ \brief Define bridge message types.
*/
STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type,
+ .to_json = ast_bridge_merge_message_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type,
+ .to_json = ast_channel_entered_bridge_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type,
+ .to_json = ast_channel_left_bridge_to_json);
STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami);
STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
/*! @} */
@@ -307,6 +313,19 @@ static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_b
return msg;
}
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_merge_message *merge;
+
+ merge = stasis_message_data(msg);
+
+ return ast_json_pack("{s: s, s: o, s: o, s: o}",
+ "type", "BridgeMerged",
+ "timestamp", ast_json_timeval(*stasis_message_timestamp(msg), NULL),
+ "bridge", ast_bridge_snapshot_to_json(merge->to),
+ "bridge_from", ast_bridge_snapshot_to_json(merge->from));
+}
+
void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from)
{
RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup);
@@ -417,6 +436,35 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha
stasis_publish(ast_bridge_topic(bridge), msg);
}
+static struct ast_json *simple_bridge_channel_event(
+ const char *type,
+ struct ast_bridge_snapshot *bridge_snapshot,
+ struct ast_channel_snapshot *channel_snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+ "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
+struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+ return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(msg));
+}
+
+struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+ return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(msg));
+}
+
typedef struct ast_json *(*json_item_serializer_cb)(void *obj);
static struct ast_json *container_to_json_array(struct ao2_container *items, json_item_serializer_cb item_cb)
diff --git a/res/res_ari_asterisk.c b/res/res_ari_asterisk.c
index 3f34c7ab6..3f0c285ad 100644
--- a/res/res_ari_asterisk.c
+++ b/res/res_ari_asterisk.c
@@ -81,8 +81,16 @@ static void ast_ari_get_asterisk_info_cb(
goto fin;
}
- args.only_count = ast_app_separate_args(
- args.only_parse, ',', vals, ARRAY_LEN(vals));
+ if (strlen(args.only_parse) == 0) {
+ /* ast_app_separate_args can't handle "" */
+ args.only_count = 1;
+ vals[0] = args.only_parse;
+ } else {
+ args.only_count = ast_app_separate_args(
+ args.only_parse, ',', vals,
+ ARRAY_LEN(vals));
+ }
+
if (args.only_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
diff --git a/res/res_ari_bridges.c b/res/res_ari_bridges.c
index bc8e20041..d3b3a649d 100644
--- a/res/res_ari_bridges.c
+++ b/res/res_ari_bridges.c
@@ -300,8 +300,16 @@ static void ast_ari_add_channel_to_bridge_cb(
goto fin;
}
- args.channel_count = ast_app_separate_args(
- args.channel_parse, ',', vals, ARRAY_LEN(vals));
+ if (strlen(args.channel_parse) == 0) {
+ /* ast_app_separate_args can't handle "" */
+ args.channel_count = 1;
+ vals[0] = args.channel_parse;
+ } else {
+ args.channel_count = ast_app_separate_args(
+ args.channel_parse, ',', vals,
+ ARRAY_LEN(vals));
+ }
+
if (args.channel_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
@@ -403,8 +411,16 @@ static void ast_ari_remove_channel_from_bridge_cb(
goto fin;
}
- args.channel_count = ast_app_separate_args(
- args.channel_parse, ',', vals, ARRAY_LEN(vals));
+ if (strlen(args.channel_parse) == 0) {
+ /* ast_app_separate_args can't handle "" */
+ args.channel_count = 1;
+ vals[0] = args.channel_parse;
+ } else {
+ args.channel_count = ast_app_separate_args(
+ args.channel_parse, ',', vals,
+ ARRAY_LEN(vals));
+ }
+
if (args.channel_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
diff --git a/res/res_ari_events.c b/res/res_ari_events.c
index 5cea06f0e..567167f14 100644
--- a/res/res_ari_events.c
+++ b/res/res_ari_events.c
@@ -89,8 +89,16 @@ static void ast_ari_event_websocket_ws_cb(struct ast_websocket *ws_session,
goto fin;
}
- args.app_count = ast_app_separate_args(
- args.app_parse, ',', vals, ARRAY_LEN(vals));
+ if (strlen(args.app_parse) == 0) {
+ /* ast_app_separate_args can't handle "" */
+ args.app_count = 1;
+ vals[0] = args.app_parse;
+ } else {
+ args.app_count = ast_app_separate_args(
+ args.app_parse, ',', vals,
+ ARRAY_LEN(vals));
+ }
+
if (args.app_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
@@ -126,14 +134,16 @@ fin: __attribute__((unused))
* negotiation. Param parsing should happen earlier, but we
* need a way to pass it through the WebSocket code to the
* callback */
- RAII_VAR(char *, msg, NULL, ast_free);
+ RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
- msg = ast_strdup("?");
+ ast_log(LOG_ERROR, "Missing response message\n");
+ }
+ if (msg) {
+ ast_websocket_write(ws_session,
+ AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg));
}
- ast_websocket_write(ws_session, AST_WEBSOCKET_OPCODE_TEXT, msg,
- strlen(msg));
}
ast_free(args.app_parse);
ast_free(args.app);
diff --git a/res/res_stasis.c b/res/res_stasis.c
index 35c1847bd..ab2bf5c86 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -87,6 +87,12 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#define CONTROLS_NUM_BUCKETS 127
/*!
+ * \brief Number of buckets for the Stasis bridges hash table. Remember to
+ * keep it a prime number!
+ */
+#define BRIDGES_NUM_BUCKETS 127
+
+/*!
* \brief Stasis application container.
*/
struct ao2_container *apps_registry;
@@ -97,12 +103,6 @@ struct ao2_container *app_bridges;
struct ao2_container *app_bridges_moh;
-/*! \brief Message router for the channel caching topic */
-struct stasis_message_router *channel_router;
-
-/*! \brief Message router for the bridge caching topic */
-struct stasis_message_router *bridge_router;
-
/*! AO2 hash function for \ref app */
static int app_hash(const void *obj, const int flags)
{
@@ -153,6 +153,30 @@ static int control_compare(void *lhs, void *rhs, int flags)
}
}
+static int cleanup_cb(void *obj, void *arg, int flags)
+{
+ struct app *app = obj;
+
+ if (!app_is_finished(app)) {
+ return 0;
+ }
+
+ ast_verb(1, "Shutting down application '%s'\n", app_name(app));
+ app_shutdown(app);
+
+ return CMP_MATCH;
+
+}
+
+/*!
+ * \brief Clean up any old apps that we don't need any more.
+ */
+static void cleanup(void)
+{
+ ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
+ cleanup_cb, NULL);
+}
+
struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
{
return control_create(chan);
@@ -435,229 +459,6 @@ struct ast_bridge *stasis_app_bridge_find_by_id(
return ao2_find(app_bridges, bridge_id, OBJ_KEY);
}
-/*! \brief Typedef for blob handler callbacks */
-typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
-
-/*! \brief Callback to check whether an app is watching a given channel */
-static int app_watching_channel_cb(void *obj, void *arg, int flags)
-{
- struct app *app = obj;
- char *uniqueid = arg;
-
- return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified channel */
-static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
-{
- struct ao2_container *watching_apps;
- char *uniqueid_dup;
- RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
- ast_assert(uniqueid != NULL);
-
- uniqueid_dup = ast_strdupa(uniqueid);
-
- watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
- watching_apps = watching_apps_iter->c;
-
- if (!ao2_container_count(watching_apps)) {
- return NULL;
- }
-
- ao2_ref(watching_apps, +1);
- return watching_apps_iter->c;
-}
-
-/*! \brief Typedef for callbacks that get called on channel snapshot updates */
-typedef struct ast_json *(*channel_snapshot_monitor)(
- struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot,
- const struct timeval *tv);
-
-static struct ast_json *simple_channel_event(
- const char *type,
- struct ast_channel_snapshot *snapshot,
- const struct timeval *tv)
-{
- return ast_json_pack("{s: s, s: o, s: o}",
- "type", type,
- "timestamp", ast_json_timeval(*tv, NULL),
- "channel", ast_channel_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *channel_created_event(
- struct ast_channel_snapshot *snapshot,
- const struct timeval *tv)
-{
- return simple_channel_event("ChannelCreated", snapshot, tv);
-}
-
-static struct ast_json *channel_destroyed_event(
- struct ast_channel_snapshot *snapshot,
- const struct timeval *tv)
-{
- return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
- "type", "ChannelDestroyed",
- "timestamp", ast_json_timeval(*tv, NULL),
- "cause", snapshot->hangupcause,
- "cause_txt", ast_cause2str(snapshot->hangupcause),
- "channel", ast_channel_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *channel_state_change_event(
- struct ast_channel_snapshot *snapshot,
- const struct timeval *tv)
-{
- return simple_channel_event("ChannelStateChange", snapshot, tv);
-}
-
-/*! \brief Handle channel state changes */
-static struct ast_json *channel_state(
- struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot,
- const struct timeval *tv)
-{
- struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
-
- if (!old_snapshot) {
- return channel_created_event(snapshot, tv);
- } else if (!new_snapshot) {
- return channel_destroyed_event(snapshot, tv);
- } else if (old_snapshot->state != new_snapshot->state) {
- return channel_state_change_event(snapshot, tv);
- }
-
- return NULL;
-}
-
-static struct ast_json *channel_dialplan(
- struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot,
- const struct timeval *tv)
-{
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
-
- /* No Newexten event on cache clear */
- if (!new_snapshot) {
- return NULL;
- }
-
- /* Empty application is not valid for a Newexten event */
- if (ast_strlen_zero(new_snapshot->appl)) {
- return NULL;
- }
-
- if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
- return NULL;
- }
-
- return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
- "type", "ChannelDialplan",
- "timestamp", ast_json_timeval(*tv, NULL),
- "dialplan_app", new_snapshot->appl,
- "dialplan_app_data", new_snapshot->data,
- "channel", ast_channel_snapshot_to_json(new_snapshot));
-}
-
-static struct ast_json *channel_callerid(
- struct ast_channel_snapshot *old_snapshot,
- struct ast_channel_snapshot *new_snapshot,
- const struct timeval *tv)
-{
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
-
- /* No NewCallerid event on cache clear or first event */
- if (!old_snapshot || !new_snapshot) {
- return NULL;
- }
-
- if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
- return NULL;
- }
-
- return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
- "type", "ChannelCallerId",
- "timestamp", ast_json_timeval(*tv, NULL),
- "caller_presentation", new_snapshot->caller_pres,
- "caller_presentation_txt", ast_describe_caller_presentation(
- new_snapshot->caller_pres),
- "channel", ast_channel_snapshot_to_json(new_snapshot));
-}
-
-channel_snapshot_monitor channel_monitors[] = {
- channel_state,
- channel_dialplan,
- channel_callerid
-};
-
-static int app_send_cb(void *obj, void *arg, int flags)
-{
- struct app *app = obj;
- struct ast_json *msg = arg;
-
- app_send(app, msg);
- return 0;
-}
-
-static void sub_channel_snapshot_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
- struct stasis_cache_update *update = stasis_message_data(message);
- struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
- struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
- /* Pull timestamp from the new snapshot, or from the update message
- * when there isn't one. */
- const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
- int i;
-
- watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
- if (!watching_apps) {
- return;
- }
-
- for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
- if (msg) {
- ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
- }
- }
-}
-
-static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
-{
- ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static void sub_channel_blob_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
- struct ast_channel_blob *obj = stasis_message_data(message);
-
- if (!obj->snapshot) {
- return;
- }
-
- msg = stasis_message_to_json(message);
- if (!msg) {
- return;
- }
-
- watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
- if (!watching_apps) {
- return;
- }
-
- distribute_message(watching_apps, msg);
-}
/*!
* \brief In addition to running ao2_cleanup(), this function also removes the
@@ -709,7 +510,7 @@ void stasis_app_bridge_destroy(const char *bridge_id)
ast_bridge_destroy(bridge, 0);
}
-int app_send_start_msg(struct app *app, struct ast_channel *chan,
+static int send_start_msg(struct app *app, struct ast_channel *chan,
int argc, char *argv[])
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
@@ -726,8 +527,9 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
return -1;
}
- msg = ast_json_pack("{s: s, s: [], s: o}",
+ msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
"type", "StasisStart",
+ "timestamp", ast_json_timeval(ast_tvnow(), NULL),
"args",
"channel", ast_channel_snapshot_to_json(snapshot));
if (!msg) {
@@ -750,7 +552,7 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
return 0;
}
-int app_send_end_msg(struct app *app, struct ast_channel *chan)
+static int send_end_msg(struct app *app, struct ast_channel *chan)
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -763,8 +565,9 @@ int app_send_end_msg(struct app *app, struct ast_channel *chan)
return -1;
}
- msg = ast_json_pack("{s: s, s: o}",
+ msg = ast_json_pack("{s: s, s: o, s: o}",
"type", "StasisEnd",
+ "timestamp", ast_json_timeval(ast_tvnow(), NULL),
"channel", ast_channel_snapshot_to_json(snapshot));
if (!msg) {
return -1;
@@ -815,15 +618,17 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
}
ao2_link(app_controls, control);
- res = app_send_start_msg(app, chan, argc, argv);
+ res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending start message to '%s'\n", app_name);
- return res;
+ return -1;
}
- if (app_add_channel(app, chan)) {
- ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
+ res = app_subscribe_channel(app, chan);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+ app_name, ast_channel_name(chan));
return -1;
}
@@ -831,13 +636,23 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
int r;
int command_count;
+ struct ast_bridge *last_bridge = NULL;
+ struct ast_bridge *bridge = NULL;
/* Check to see if a bridge absorbed our hangup frame */
if (ast_check_hangup_locked(chan)) {
break;
}
- if (stasis_app_get_bridge(control)) {
+ last_bridge = bridge;
+ bridge = stasis_app_get_bridge(control);
+
+ if (bridge != last_bridge) {
+ app_unsubscribe_bridge(app, last_bridge);
+ app_subscribe_bridge(app, bridge);
+ }
+
+ if (bridge) {
/* Bridge is handling channel frames */
control_wait(control);
control_dispatch_all(control, chan);
@@ -882,14 +697,21 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
}
}
- app_remove_channel(app, chan);
- res = app_send_end_msg(app, chan);
+ app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
+ app_unsubscribe_channel(app, chan);
+
+ res = send_end_msg(app, chan);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending end message to %s\n", app_name);
return res;
}
+ /* There's an off chance that app is ready for cleanup. Go ahead
+ * and clean up, just in case
+ */
+ cleanup();
+
return res;
}
@@ -912,29 +734,6 @@ int stasis_app_send(const char *app_name, struct ast_json *message)
return 0;
}
-static int cleanup_cb(void *obj, void *arg, int flags)
-{
- struct app *app = obj;
-
- if (!app_is_finished(app)) {
- return 0;
- }
-
- ast_verb(1, "Cleaning up application '%s'\n", app_name(app));
-
- return CMP_MATCH;
-
-}
-
-/*!
- * \brief Clean up any old apps that we don't need any more.
- */
-static void cleanup(void)
-{
- ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
- cleanup_cb, NULL);
-}
-
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@@ -994,249 +793,22 @@ void stasis_app_unref(void)
ast_module_unref(ast_module_info->self);
}
-/*! \brief Callback to check whether an app is watching a given bridge */
-static int app_watching_bridge_cb(void *obj, void *arg, int flags)
-{
- struct app *app = obj;
- char *uniqueid = arg;
-
- return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified bridge */
-static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
-{
- struct ao2_container *watching_apps;
- char *uniqueid_dup;
- RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
- ast_assert(uniqueid != NULL);
-
- uniqueid_dup = ast_strdupa(uniqueid);
-
- watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup);
- watching_apps = watching_apps_iter->c;
-
- if (!ao2_container_count(watching_apps)) {
- return NULL;
- }
-
- ao2_ref(watching_apps, +1);
- return watching_apps_iter->c;
-}
-
-/*! Callback used to remove an app's interest in a bridge */
-static int remove_bridge_cb(void *obj, void *arg, int flags)
-{
- app_remove_bridge(obj, arg);
- return 0;
-}
-
-static struct ast_json *simple_bridge_event(
- const char *type,
- struct ast_bridge_snapshot *snapshot,
- const struct timeval *tv)
-{
- return ast_json_pack("{s: s, s: o, s: o}",
- "type", type,
- "timestamp", ast_json_timeval(*tv, NULL),
- "bridge", ast_bridge_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *simple_bridge_channel_event(
- const char *type,
- struct ast_bridge_snapshot *bridge_snapshot,
- struct ast_channel_snapshot *channel_snapshot,
- const struct timeval *tv)
-{
- return ast_json_pack("{s: s, s: o, s: o, s: o}",
- "type", type,
- "timestamp", ast_json_timeval(*tv, NULL),
- "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
- "channel", ast_channel_snapshot_to_json(channel_snapshot));
-}
-
-static void sub_bridge_snapshot_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
- struct stasis_cache_update *update = stasis_message_data(message);
- struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
- struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
- const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
- if (!watching_apps || !ao2_container_count(watching_apps)) {
- return;
- }
-
- if (!new_snapshot) {
- RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free);
-
- /* The bridge has gone away. Create the message, make sure no apps are
- * watching this bridge anymore, and destroy the bridge's control
- * structure */
- msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
- ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
- stasis_app_bridge_destroy(old_snapshot->uniqueid);
- } else if (!old_snapshot) {
- msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
- }
-
- if (!msg) {
- return;
- }
-
- distribute_message(watching_apps, msg);
-}
-
-/*! \brief Callback used to merge two containers of applications */
-static int list_merge_cb(void *obj, void *arg, int flags)
-{
- /* remove any current entries for this app */
- ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
- /* relink as the only entry */
- ao2_link(arg, obj);
- return 0;
-}
-
-/*! \brief Merge container src into container dst without modifying src */
-static void update_apps_list(struct ao2_container *dst, struct ao2_container *src)
-{
- ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
-}
-
-/*! \brief Callback for adding to an app's bridges of interest */
-static int app_add_bridge_cb(void *obj, void *arg, int flags)
-{
- app_add_bridge(obj, arg);
- return 0;
-}
-
-/*! \brief Add interest in the given bridge to all apps in the container */
-static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id)
-{
- RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
- ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
-}
-
-static void sub_bridge_merge_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
- struct ast_bridge_merge_message *merge = stasis_message_data(message);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
- const struct timeval *tv = stasis_message_timestamp(message);
-
- watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
- if (watching_apps_to) {
- update_apps_list(watching_apps_all, watching_apps_to);
- }
-
- watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
- if (watching_apps_from) {
- update_bridge_interest(watching_apps_from, merge->to->uniqueid);
- update_apps_list(watching_apps_all, watching_apps_from);
- }
-
- if (!ao2_container_count(watching_apps_all)) {
- return;
- }
-
- msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
- "type", "BridgeMerged",
- "timestamp", ast_json_timeval(*tv, NULL),
- "bridge", ast_bridge_snapshot_to_json(merge->to),
- "bridge_from", ast_bridge_snapshot_to_json(merge->from));
-
- if (!msg) {
- return;
- }
-
- distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_enter_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
- struct ast_bridge_blob *obj = stasis_message_data(message);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
- if (watching_apps_bridge) {
- update_apps_list(watching_apps_all, watching_apps_bridge);
- }
-
- watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid);
- if (watching_apps_channel) {
- update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid);
- update_apps_list(watching_apps_all, watching_apps_channel);
- }
-
- if (!ao2_container_count(watching_apps_all)) {
- return;
- }
-
- msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
- obj->channel, stasis_message_timestamp(message));
-
- distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_leave_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
- struct ast_bridge_blob *obj = stasis_message_data(message);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
- if (!watching_apps_bridge) {
- return;
- }
-
- msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
- obj->channel, stasis_message_timestamp(message));
-
- distribute_message(watching_apps_bridge, msg);
-}
-
static int load_module(void)
{
- int r = 0;
-
- apps_registry =
- ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
+ apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
+ app_compare);
if (apps_registry == NULL) {
return AST_MODULE_LOAD_FAILURE;
}
- app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
- control_hash, control_compare);
+ app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
+ control_compare);
if (app_controls == NULL) {
return AST_MODULE_LOAD_FAILURE;
}
- app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
- bridges_hash, bridges_compare);
- if (app_bridges == NULL) {
- return AST_MODULE_LOAD_FAILURE;
- }
+ app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
+ bridges_compare);
app_bridges_moh = ao2_container_alloc_hash(
AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
@@ -1246,52 +818,11 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
- channel_router = stasis_message_router_create(ast_channel_topic_all_cached());
- if (!channel_router) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
- r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
- /* TODO: This could be handled a lot better. Instead of subscribing to
- * the one caching topic and filtering out messages by channel id, we
- * should have individual caching topics per-channel, with a shared
- * back-end cache. That would simplify a lot of what's going on right
- * here.
- */
- r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
- if (r) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
- bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached());
- if (!bridge_router) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
- r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
- r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
- r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
- r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
- if (r) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
return AST_MODULE_LOAD_SUCCESS;
}
static int unload_module(void)
{
- int r = 0;
-
- stasis_message_router_unsubscribe_and_join(channel_router);
- channel_router = NULL;
-
- stasis_message_router_unsubscribe_and_join(bridge_router);
- bridge_router = NULL;
-
ao2_cleanup(apps_registry);
apps_registry = NULL;
@@ -1304,7 +835,7 @@ static int unload_module(void)
ao2_cleanup(app_bridges_moh);
app_bridges_moh = NULL;
- return r;
+ return 0;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 6f80ed64a..8abe0c19c 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -29,132 +29,519 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "app.h"
+#include "asterisk/callerid.h"
#include "asterisk/stasis_app.h"
+#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
-
-/*!
- * \brief Number of buckets for the channels container for app instances. Remember
- * to keep it a prime number!
- */
-#define APP_CHANNELS_BUCKETS 7
-
-/*!
- * \brief Number of buckets for the bridges container for app instances. Remember
- * to keep it a prime number!
- */
-#define APP_BRIDGES_BUCKETS 7
+#include "asterisk/stasis_message_router.h"
struct app {
+ /*! Aggregation topic for this application. */
+ struct stasis_topic *topic;
+ /*! Router for handling messages forwarded to \a topic. */
+ struct stasis_message_router *router;
+ /*! Subscription to watch for bridge merge messages */
+ struct stasis_subscription *bridge_merge_sub;
+ /*! Container of the channel forwards to this app's topic. */
+ struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
- /*! List of channel identifiers this app instance is interested in */
- struct ao2_container *channels;
- /*! List of bridge identifiers this app instance owns */
- struct ao2_container *bridges;
/*! Name of the Stasis application */
char name[];
};
+/*! Subscription info for a particular channel/bridge. */
+struct app_forwards {
+ /*! Count of number of times this channel/bridge has been subscribed */
+ int interested;
+
+ /*! Forward for the regular topic */
+ struct stasis_subscription *topic_forward;
+ /*! Forward for the caching topic */
+ struct stasis_subscription *topic_cached_forward;
+
+ /*! Unique id of the object being forwarded */
+ char id[];
+};
+
+static void forwards_dtor(void *obj)
+{
+ struct app_forwards *forwards = obj;
+
+ ast_assert(forwards->topic_forward == NULL);
+ ast_assert(forwards->topic_cached_forward == NULL);
+}
+
+static void forwards_unsubscribe(struct app_forwards *forwards)
+{
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ stasis_unsubscribe(forwards->topic_cached_forward);
+ forwards->topic_cached_forward = NULL;
+}
+
+static struct app_forwards *forwards_create(struct app *app,
+ const char *id)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || ast_strlen_zero(id)) {
+ return NULL;
+ }
+
+ forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+ if (!forwards) {
+ return NULL;
+ }
+
+ strcpy(forwards->id, id);
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+/*! Forward a channel's topics to an app */
+static struct app_forwards *forwards_create_channel(struct app *app,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !chan) {
+ return NULL;
+ }
+
+ forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ if (!forwards) {
+ return NULL;
+ }
+
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
+ if (!forwards->topic_forward) {
+ return NULL;
+ }
+
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_channel_topic_cached(chan), app->topic);
+ if (!forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ return NULL;
+ }
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+/*! Forward a bridge's topics to an app */
+static struct app_forwards *forwards_create_bridge(struct app *app,
+ struct ast_bridge *bridge)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !bridge) {
+ return NULL;
+ }
+
+ forwards = forwards_create(app, bridge->uniqueid);
+ if (!forwards) {
+ return NULL;
+ }
+
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
+ if (!forwards->topic_forward) {
+ return NULL;
+ }
+
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_bridge_topic_cached(bridge), app->topic);
+ if (!forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ return NULL;
+ }
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
+{
+ const struct app_forwards *object_left = obj_left;
+ const struct app_forwards *object_right = obj_right;
+ const char *right_key = obj_right;
+ int cmp;
+
+ switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+ case OBJ_POINTER:
+ right_key = object_right->id;
+ /* Fall through */
+ case OBJ_KEY:
+ cmp = strcmp(object_left->id, right_key);
+ break;
+ case OBJ_PARTIAL_KEY:
+ /*
+ * We could also use a partial key struct containing a length
+ * so strlen() does not get called for every comparison instead.
+ */
+ cmp = strncmp(object_left->id, right_key, strlen(right_key));
+ break;
+ default:
+ /* Sort can only work on something with a full or partial key. */
+ ast_assert(0);
+ cmp = 0;
+ break;
+ }
+ return cmp;
+}
+
static void app_dtor(void *obj)
{
struct app *app = obj;
+ ast_verb(1, "Destroying Stasis app %s\n", app->name);
+
+ ast_assert(app->router == NULL);
+ ast_assert(app->bridge_merge_sub == NULL);
+
+ ao2_cleanup(app->topic);
+ app->topic = NULL;
+ ao2_cleanup(app->forwards);
+ app->forwards = NULL;
ao2_cleanup(app->data);
app->data = NULL;
- ao2_cleanup(app->channels);
- app->channels = NULL;
- ao2_cleanup(app->bridges);
- app->bridges = NULL;
}
-struct app *app_create(const char *name, stasis_app_cb handler, void *data)
+static void sub_default_handler(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
{
- RAII_VAR(struct app *, app, NULL, ao2_cleanup);
- size_t size;
+ struct app *app = data;
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
- ast_assert(name != NULL);
- ast_assert(handler != NULL);
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(app);
+ }
- ast_verb(1, "Creating Stasis app '%s'\n", name);
+ /* By default, send any message that has a JSON representation */
+ json = stasis_message_to_json(message);
+ if (!json) {
+ return;
+ }
- size = sizeof(*app) + strlen(name) + 1;
- app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
+ app_send(app, json);
+}
- if (!app) {
- return NULL;
+/*! \brief Typedef for callbacks that get called on channel snapshot updates */
+typedef struct ast_json *(*channel_snapshot_monitor)(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv);
+
+static struct ast_json *simple_channel_event(
+ const char *type,
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_created_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelCreated", snapshot, tv);
+}
+
+static struct ast_json *channel_destroyed_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelDestroyed",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "cause", snapshot->hangupcause,
+ "cause_txt", ast_cause2str(snapshot->hangupcause),
+ "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_state_change_event(
+ struct ast_channel_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return simple_channel_event("ChannelStateChange", snapshot, tv);
+}
+
+/*! \brief Handle channel state changes */
+static struct ast_json *channel_state(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
+{
+ struct ast_channel_snapshot *snapshot = new_snapshot ?
+ new_snapshot : old_snapshot;
+
+ if (!old_snapshot) {
+ return channel_created_event(snapshot, tv);
+ } else if (!new_snapshot) {
+ return channel_destroyed_event(snapshot, tv);
+ } else if (old_snapshot->state != new_snapshot->state) {
+ return channel_state_change_event(snapshot, tv);
}
- strncpy(app->name, name, size - sizeof(*app));
- app->handler = handler;
- ao2_ref(data, +1);
- app->data = data;
+ return NULL;
+}
- app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
- if (!app->channels) {
+static struct ast_json *channel_dialplan(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+ /* No Newexten event on cache clear or first event */
+ if (!old_snapshot || !new_snapshot) {
return NULL;
}
- app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS);
- if (!app->bridges) {
+ /* Empty application is not valid for a Newexten event */
+ if (ast_strlen_zero(new_snapshot->appl)) {
return NULL;
}
- ao2_ref(app, +1);
- return app;
+ if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
+ return NULL;
+ }
+
+ return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
+ "type", "ChannelDialplan",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "dialplan_app", new_snapshot->appl,
+ "dialplan_app_data", new_snapshot->data,
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
-int app_add_channel(struct app *app, const struct ast_channel *chan)
+static struct ast_json *channel_callerid(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot,
+ const struct timeval *tv)
{
- SCOPED_AO2LOCK(lock, app);
- const char *uniqueid;
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
- ast_assert(app != NULL);
- ast_assert(chan != NULL);
+ /* No NewCallerid event on cache clear or first event */
+ if (!old_snapshot || !new_snapshot) {
+ return NULL;
+ }
- /* Don't accept new channels in an inactive application */
- if (!app->handler) {
- return -1;
+ if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
+ return NULL;
}
- uniqueid = ast_channel_uniqueid(chan);
- return ast_str_container_add(app->channels, uniqueid) ? -1 : 0;
+ return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+ "type", "ChannelCallerId",
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "caller_presentation", new_snapshot->caller_pres,
+ "caller_presentation_txt", ast_describe_caller_presentation(
+ new_snapshot->caller_pres),
+ "channel", ast_channel_snapshot_to_json(new_snapshot));
}
-void app_remove_channel(struct app* app, const struct ast_channel *chan)
+static channel_snapshot_monitor channel_monitors[] = {
+ channel_state,
+ channel_dialplan,
+ channel_callerid
+};
+
+static void sub_channel_update_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
{
- SCOPED_AO2LOCK(lock, app);
+ struct app *app = data;
+ struct stasis_cache_update *update;
+ struct ast_channel_snapshot *new_snapshot;
+ struct ast_channel_snapshot *old_snapshot;
+ const struct timeval *tv;
+ int i;
+
+ ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+ update = stasis_message_data(message);
+
+ ast_assert(update->type == ast_channel_snapshot_type());
+
+ new_snapshot = stasis_message_data(update->new_snapshot);
+ old_snapshot = stasis_message_data(update->old_snapshot);
+
+ /* Pull timestamp from the new snapshot, or from the update message
+ * when there isn't one. */
+ tv = update->new_snapshot ?
+ stasis_message_timestamp(update->new_snapshot) :
+ stasis_message_timestamp(message);
- ast_assert(app != NULL);
- ast_assert(chan != NULL);
+ for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
+ msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
+ if (msg) {
+ app_send(app, msg);
+ }
+ }
}
-int app_add_bridge(struct app *app, const char *uniqueid)
+static struct ast_json *simple_bridge_event(
+ const char *type,
+ struct ast_bridge_snapshot *snapshot,
+ const struct timeval *tv)
{
- SCOPED_AO2LOCK(lock, app);
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
- ast_assert(app != NULL);
- ast_assert(uniqueid != NULL);
+static void sub_bridge_update_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct app *app = data;
+ struct stasis_cache_update *update;
+ struct ast_bridge_snapshot *new_snapshot;
+ struct ast_bridge_snapshot *old_snapshot;
+ const struct timeval *tv;
- /* Don't accept new bridges in an inactive application */
- if (!app->handler) {
- return -1;
+ ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+ update = stasis_message_data(message);
+
+ ast_assert(update->type == ast_bridge_snapshot_type());
+
+ new_snapshot = stasis_message_data(update->new_snapshot);
+ old_snapshot = stasis_message_data(update->old_snapshot);
+ tv = update->new_snapshot ?
+ stasis_message_timestamp(update->new_snapshot) :
+ stasis_message_timestamp(message);
+
+ if (!new_snapshot) {
+ json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
+ } else if (!old_snapshot) {
+ json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
+ }
+
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
+}
+
+static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct app *app = data;
+ struct ast_bridge_merge_message *merge;
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(app);
}
- return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0;
+ if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
+ return;
+ }
+
+ merge = stasis_message_data(message);
+
+ /* Find out if we're subscribed to either bridge */
+ forwards = ao2_find(app->forwards, merge->from->uniqueid,
+ OBJ_SEARCH_KEY);
+ if (!forwards) {
+ forwards = ao2_find(app->forwards, merge->to->uniqueid,
+ OBJ_SEARCH_KEY);
+ }
+
+ if (!forwards) {
+ return;
+ }
+
+ /* Forward the message to the app */
+ stasis_forward_message(app->topic, topic, message);
}
-void app_remove_bridge(struct app* app, const char *uniqueid)
+struct app *app_create(const char *name, stasis_app_cb handler, void *data)
{
- SCOPED_AO2LOCK(lock, app);
+ RAII_VAR(struct app *, app, NULL, ao2_cleanup);
+ size_t size;
+ int res = 0;
+
+ ast_assert(name != NULL);
+ ast_assert(handler != NULL);
+
+ ast_verb(1, "Creating Stasis app '%s'\n", name);
+
+ size = sizeof(*app) + strlen(name) + 1;
+ app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
+
+ if (!app) {
+ return NULL;
+ }
+
+ app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+ forwards_sort, NULL);
+ if (!app->forwards) {
+ return NULL;
+ }
+
+ app->topic = stasis_topic_create(name);
+ if (!app->topic) {
+ return NULL;
+ }
+
+ app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
+ bridge_merge_handler, app);
+ if (!app->bridge_merge_sub) {
+ return NULL;
+ }
+ /* Subscription holds a reference */
+ ao2_ref(app, +1);
+
+ app->router = stasis_message_router_create(app->topic);
+ if (!app->router) {
+ return NULL;
+ }
+
+ res |= stasis_message_router_add_cache_update(app->router,
+ ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
+
+ res |= stasis_message_router_add_cache_update(app->router,
+ ast_channel_snapshot_type(), sub_channel_update_handler, app);
+
+ res |= stasis_message_router_set_default(app->router,
+ sub_default_handler, app);
- ast_assert(app != NULL);
- ast_assert(uniqueid != NULL);
+ if (res != 0) {
+ return NULL;
+ }
+ /* Router holds a reference */
+ ao2_ref(app, +1);
+
+ strncpy(app->name, name, size - sizeof(*app));
+ app->handler = handler;
+ ao2_ref(data, +1);
+ app->data = data;
- ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE);
+ ao2_ref(app, +1);
+ return app;
}
/*!
@@ -196,6 +583,18 @@ void app_deactivate(struct app *app)
app->data = NULL;
}
+void app_shutdown(struct app *app)
+{
+ SCOPED_AO2LOCK(lock, app);
+
+ ast_assert(app_is_finished(app));
+
+ stasis_message_router_unsubscribe(app->router);
+ app->router = NULL;
+ stasis_unsubscribe(app->bridge_merge_sub);
+ app->bridge_merge_sub = NULL;
+}
+
int app_is_active(struct app *app)
{
SCOPED_AO2LOCK(lock, app);
@@ -206,8 +605,7 @@ int app_is_finished(struct app *app)
{
SCOPED_AO2LOCK(lock, app);
- return app->handler == NULL &&
- ao2_container_count(app->channels) == 0;
+ return app->handler == NULL && ao2_container_count(app->forwards) == 0;
}
void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -229,7 +627,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data)
ast_verb(1, "Activating Stasis app '%s'\n", app->name);
}
-
app->handler = handler;
ao2_cleanup(app->data);
if (data) {
@@ -243,16 +640,100 @@ const char *app_name(const struct app *app)
return app->name;
}
-int app_is_watching_channel(struct app *app, const char *uniqueid)
+int app_subscribe_channel(struct app *app, struct ast_channel *chan)
{
- RAII_VAR(char *, found, NULL, ao2_cleanup);
- found = ao2_find(app->channels, uniqueid, OBJ_KEY);
- return found != NULL;
+ int res;
+
+ if (!app || !chan) {
+ return -1;
+ } else {
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
+ }
+
+ res = ao2_link_flags(app->forwards, forwards,
+ OBJ_NOLOCK);
+ if (!res) {
+ return -1;
+ }
+ }
+
+ ++forwards->interested;
+ return 0;
+ }
+}
+
+static int unsubscribe(struct app *app, const char *kind, const char *id)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ ast_log(LOG_ERROR,
+ "App '%s' not subscribed to %s '%s'",
+ app->name, kind, id);
+ return -1;
+ }
+
+ if (--forwards->interested == 0) {
+ /* No one is interested any more; unsubscribe */
+ forwards_unsubscribe(forwards);
+ ao2_find(app->forwards, forwards,
+ OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
+ OBJ_NODATA);
+ }
+
+ return 0;
}
-int app_is_watching_bridge(struct app *app, const char *uniqueid)
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
{
- RAII_VAR(char *, found, NULL, ao2_cleanup);
- found = ao2_find(app->bridges, uniqueid, OBJ_KEY);
- return found != NULL;
+ if (!app || !chan) {
+ return -1;
+ }
+
+ return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
+}
+
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+ if (!app || !bridge) {
+ return -1;
+ } else {
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, bridge->uniqueid,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
+ if (!forwards) {
+ return -1;
+ }
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ }
+
+ ++forwards->interested;
+ return 0;
+ }
+}
+
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+ if (!app || !bridge) {
+ return -1;
+ }
+
+ return unsubscribe(app, "bridge", bridge->uniqueid);
}
diff --git a/res/stasis/app.h b/res/stasis/app.h
index 0cf92217f..5f9f1d7e7 100644
--- a/res/stasis/app.h
+++ b/res/stasis/app.h
@@ -48,6 +48,15 @@ struct app;
struct app *app_create(const char *name, stasis_app_cb handler, void *data);
/*!
+ * \brief Tears down an application.
+ *
+ * It should be finished before calling this.
+ *
+ * \param app Application to unsubscribe.
+ */
+void app_shutdown(struct app *app);
+
+/*!
* \brief Deactivates an application.
*
* Any channels currently in the application remain active (since the app might
@@ -96,17 +105,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data);
const char *app_name(const struct app *app);
/*!
- * \brief Subscribe an application to a topic.
- *
- * \param app Application.
- * \param topic Topic to subscribe to.
- * \return New subscription.
- * \return \c NULL on error.
- */
-struct stasis_subscription *app_subscribe(struct app *app,
- struct stasis_topic *topic);
-
-/*!
* \brief Send a message to an application.
*
* \param app Application.
@@ -114,83 +112,44 @@ struct stasis_subscription *app_subscribe(struct app *app,
*/
void app_send(struct app *app, struct ast_json *message);
-/*!
- * \brief Send the start message to an application.
- *
- * \param app Application.
- * \param chan The channel entering the application.
- * \param argc The number of arguments for the application.
- * \param argv The arguments for the application.
- * \return 0 on success.
- * \return Non-zero on error.
- */
-int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc,
- char *argv[]);
+struct app_forwards;
/*!
- * \brief Send the end message to an application.
+ * \brief Subscribes an application to a channel.
*
* \param app Application.
- * \param chan The channel leaving the application.
+ * \param chan Channel to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_send_end_msg(struct app *app, struct ast_channel *chan);
+int app_subscribe_channel(struct app *app, struct ast_channel *chan);
/*!
- * \brief Checks if an application is watching a given channel.
+ * \brief Cancel the subscription an app has for a channel.
*
- * \param app Application.
- * \param uniqueid Uniqueid of the channel to check about.
- * \return True (non-zero) if \a app is watching channel with given \a uniqueid
- * \return False (zero) if \a app isn't.
+ * \param app Subscribing application.
+ * \param forwards Returned object from app_subscribe_channel().
*/
-int app_is_watching_channel(struct app *app, const char *uniqueid);
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan);
/*!
- * \brief Add a channel to an application's watch list.
+ * \brief Add a bridge subscription to an existing channel subscription.
*
* \param app Application.
- * \param chan Channel to watch.
+ * \param bridge Bridge to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_add_channel(struct app *app, const struct ast_channel *chan);
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge);
/*!
- * \brief Remove a channel from an application's watch list.
+ * \brief Cancel the bridge subscription for an application.
*
- * \param app Application.
- * \param chan Channel to watch.
- */
-void app_remove_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Add a bridge to an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to watch.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_add_bridge(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Remove a bridge from an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to remove.
- */
-void app_remove_bridge(struct app* app, const char *uniqueid);
-
-/*!
- * \brief Checks if an application is watching a given bridge.
- *
- * \param app Application.
- * \param uniqueid Uniqueid of the bridge to check.
- * \return True (non-zero) if \a app is watching bridge with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_bridge(struct app *app, const char *uniqueid);
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge);
#endif /* _ASTERISK_RES_STASIS_APP_H */
diff --git a/rest-api-templates/param_parsing.mustache b/rest-api-templates/param_parsing.mustache
index 59c59e958..aabd728fd 100644
--- a/rest-api-templates/param_parsing.mustache
+++ b/rest-api-templates/param_parsing.mustache
@@ -36,8 +36,16 @@
goto fin;
}
- args.{{c_name}}_count = ast_app_separate_args(
- args.{{c_name}}_parse, ',', vals, ARRAY_LEN(vals));
+ if (strlen(args.{{c_name}}_parse) == 0) {
+ /* ast_app_separate_args can't handle "" */
+ args.{{c_name}}_count = 1;
+ vals[0] = args.{{c_name}}_parse;
+ } else {
+ args.{{c_name}}_count = ast_app_separate_args(
+ args.{{c_name}}_parse, ',', vals,
+ ARRAY_LEN(vals));
+ }
+
if (args.{{c_name}}_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
diff --git a/rest-api-templates/res_ari_resource.c.mustache b/rest-api-templates/res_ari_resource.c.mustache
index 906d55f0d..e6b2a88f4 100644
--- a/rest-api-templates/res_ari_resource.c.mustache
+++ b/rest-api-templates/res_ari_resource.c.mustache
@@ -174,14 +174,16 @@ fin: __attribute__((unused))
* negotiation. Param parsing should happen earlier, but we
* need a way to pass it through the WebSocket code to the
* callback */
- RAII_VAR(char *, msg, NULL, ast_free);
+ RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
- msg = ast_strdup("?");
+ ast_log(LOG_ERROR, "Missing response message\n");
+ }
+ if (msg) {
+ ast_websocket_write(ws_session,
+ AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg));
}
- ast_websocket_write(ws_session, AST_WEBSOCKET_OPCODE_TEXT, msg,
- strlen(msg));
}
{{> param_cleanup}}
}