summaryrefslogtreecommitdiff
path: root/res/stasis/app.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/stasis/app.c')
-rw-r--r--res/stasis/app.c143
1 files changed, 110 insertions, 33 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 433d3adb5..8ad41e565 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -36,7 +36,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
-struct app {
+struct stasis_app {
/*! Aggregation topic for this application. */
struct stasis_topic *topic;
/*! Router for handling messages forwarded to \a topic. */
@@ -93,7 +93,7 @@ static void forwards_unsubscribe(struct app_forwards *forwards)
forwards->topic_cached_forward = NULL;
}
-static struct app_forwards *forwards_create(struct app *app,
+static struct app_forwards *forwards_create(struct stasis_app *app,
const char *id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
@@ -114,7 +114,7 @@ static struct app_forwards *forwards_create(struct app *app,
}
/*! Forward a channel's topics to an app */
-static struct app_forwards *forwards_create_channel(struct app *app,
+static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
@@ -149,7 +149,7 @@ static struct app_forwards *forwards_create_channel(struct app *app,
}
/*! Forward a bridge's topics to an app */
-static struct app_forwards *forwards_create_bridge(struct app *app,
+static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
@@ -184,7 +184,7 @@ static struct app_forwards *forwards_create_bridge(struct app *app,
}
/*! Forward a endpoint's topics to an app */
-static struct app_forwards *forwards_create_endpoint(struct app *app,
+static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
@@ -250,7 +250,7 @@ static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
static void app_dtor(void *obj)
{
- struct app *app = obj;
+ struct stasis_app *app = obj;
ast_verb(1, "Destroying Stasis app %s\n", app->name);
@@ -268,7 +268,7 @@ static void app_dtor(void *obj)
static void sub_default_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
- struct app *app = data;
+ struct stasis_app *app = data;
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
if (stasis_subscription_final_message(sub, message)) {
@@ -435,7 +435,7 @@ static void sub_channel_update_handler(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
{
- struct app *app = data;
+ struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_channel_snapshot *new_snapshot;
struct ast_channel_snapshot *old_snapshot;
@@ -489,7 +489,7 @@ static void sub_endpoint_update_handler(void *data,
struct stasis_message *message)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
- struct app *app = data;
+ struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot;
const struct timeval *tv;
@@ -535,7 +535,7 @@ static void sub_bridge_update_handler(void *data,
struct stasis_message *message)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
- struct app *app = data;
+ struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_bridge_snapshot *new_snapshot;
struct ast_bridge_snapshot *old_snapshot;
@@ -569,7 +569,7 @@ static void sub_bridge_update_handler(void *data,
static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
- struct app *app = data;
+ struct stasis_app *app = data;
struct ast_bridge_merge_message *merge;
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
@@ -599,9 +599,9 @@ static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
stasis_publish(app->topic, message);
}
-struct app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
{
- RAII_VAR(struct app *, app, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
int res = 0;
@@ -674,7 +674,7 @@ struct app *app_create(const char *name, stasis_app_cb handler, void *data)
* \param app App to send the message to.
* \param message Message to send.
*/
-void app_send(struct app *app, struct ast_json *message)
+void app_send(struct stasis_app *app, struct ast_json *message)
{
stasis_app_cb handler;
RAII_VAR(void *, data, NULL, ao2_cleanup);
@@ -699,7 +699,7 @@ void app_send(struct app *app, struct ast_json *message)
handler(data, app->name, message);
}
-void app_deactivate(struct app *app)
+void app_deactivate(struct stasis_app *app)
{
SCOPED_AO2LOCK(lock, app);
ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
@@ -708,7 +708,7 @@ void app_deactivate(struct app *app)
app->data = NULL;
}
-void app_shutdown(struct app *app)
+void app_shutdown(struct stasis_app *app)
{
SCOPED_AO2LOCK(lock, app);
@@ -720,20 +720,20 @@ void app_shutdown(struct app *app)
app->bridge_merge_sub = NULL;
}
-int app_is_active(struct app *app)
+int app_is_active(struct stasis_app *app)
{
SCOPED_AO2LOCK(lock, app);
return app->handler != NULL;
}
-int app_is_finished(struct app *app)
+int app_is_finished(struct stasis_app *app)
{
SCOPED_AO2LOCK(lock, app);
return app->handler == NULL && ao2_container_count(app->forwards) == 0;
}
-void app_update(struct app *app, stasis_app_cb handler, void *data)
+void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
{
SCOPED_AO2LOCK(lock, app);
@@ -760,12 +760,12 @@ void app_update(struct app *app, stasis_app_cb handler, void *data)
app->data = data;
}
-const char *app_name(const struct app *app)
+const char *app_name(const struct stasis_app *app)
{
return app->name;
}
-struct ast_json *app_to_json(const struct app *app)
+struct ast_json *app_to_json(const struct stasis_app *app)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ast_json *channels;
@@ -815,7 +815,7 @@ struct ast_json *app_to_json(const struct app *app)
return ast_json_ref(json);
}
-int app_subscribe_channel(struct app *app, struct ast_channel *chan)
+int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
int res;
@@ -846,7 +846,12 @@ int app_subscribe_channel(struct app *app, struct ast_channel *chan)
}
}
-static int unsubscribe(struct app *app, const char *kind, const char *id)
+static int subscribe_channel(struct stasis_app *app, void *obj)
+{
+ return app_subscribe_channel(app, obj);
+}
+
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
@@ -870,7 +875,7 @@ static int unsubscribe(struct app *app, const char *kind, const char *id)
return 0;
}
-int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
+int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
if (!app || !chan) {
return -1;
@@ -879,7 +884,7 @@ int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
}
-int app_unsubscribe_channel_id(struct app *app, const char *channel_id)
+int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
if (!app || !channel_id) {
return -1;
@@ -888,14 +893,27 @@ int app_unsubscribe_channel_id(struct app *app, const char *channel_id)
return unsubscribe(app, "channel", channel_id);
}
-int app_is_subscribed_channel_id(struct app *app, const char *channel_id)
+int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
-int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
+static void *channel_find(const struct stasis_app *app, const char *id)
+{
+ return ast_channel_get_by_name(id);
+}
+
+struct stasis_app_event_source channel_event_source = {
+ .scheme = "channel:",
+ .find = channel_find,
+ .subscribe = subscribe_channel,
+ .unsubscribe = app_unsubscribe_channel_id,
+ .is_subscribed = app_is_subscribed_channel_id
+};
+
+int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
if (!app || !bridge) {
return -1;
@@ -920,7 +938,12 @@ int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
}
}
-int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
+static int subscribe_bridge(struct stasis_app *app, void *obj)
+{
+ return app_subscribe_bridge(app, obj);
+}
+
+int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
if (!app || !bridge) {
return -1;
@@ -929,7 +952,7 @@ int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
return app_unsubscribe_bridge_id(app, bridge->uniqueid);
}
-int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id)
+int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
if (!app || !bridge_id) {
return -1;
@@ -938,14 +961,27 @@ int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id)
return unsubscribe(app, "bridge", bridge_id);
}
-int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id)
+int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
-int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint)
+static void *bridge_find(const struct stasis_app *app, const char *id)
+{
+ return stasis_app_bridge_find_by_id(id);
+}
+
+struct stasis_app_event_source bridge_event_source = {
+ .scheme = "bridge:",
+ .find = bridge_find,
+ .subscribe = subscribe_bridge,
+ .unsubscribe = app_unsubscribe_bridge_id,
+ .is_subscribed = app_is_subscribed_bridge_id
+};
+
+int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
if (!app || !endpoint) {
return -1;
@@ -970,7 +1006,12 @@ int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint)
}
}
-int app_unsubscribe_endpoint_id(struct app *app, const char *endpoint_id)
+static int subscribe_endpoint(struct stasis_app *app, void *obj)
+{
+ return app_subscribe_endpoint(app, obj);
+}
+
+int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
if (!app || !endpoint_id) {
return -1;
@@ -979,9 +1020,45 @@ int app_unsubscribe_endpoint_id(struct app *app, const char *endpoint_id)
return unsubscribe(app, "endpoint", endpoint_id);
}
-int app_is_subscribed_endpoint_id(struct app *app, const char *endpoint_id)
+int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
+
+static void *endpoint_find(const struct stasis_app *app, const char *id)
+{
+ return ast_endpoint_find_by_id(id);
+}
+
+struct stasis_app_event_source endpoint_event_source = {
+ .scheme = "endpoint:",
+ .find = endpoint_find,
+ .subscribe = subscribe_endpoint,
+ .unsubscribe = app_unsubscribe_endpoint_id,
+ .is_subscribed = app_is_subscribed_endpoint_id
+};
+
+void stasis_app_register_event_sources(void)
+{
+ stasis_app_register_event_source(&channel_event_source);
+ stasis_app_register_event_source(&bridge_event_source);
+ stasis_app_register_event_source(&endpoint_event_source);
+}
+
+int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
+{
+ return obj == &endpoint_event_source ||
+ obj == &bridge_event_source ||
+ obj == &channel_event_source;
+}
+
+void stasis_app_unregister_event_sources(void)
+{
+ stasis_app_unregister_event_source(&endpoint_event_source);
+ stasis_app_unregister_event_source(&bridge_event_source);
+ stasis_app_unregister_event_source(&channel_event_source);
+}
+
+