diff options
Diffstat (limited to 'res/stasis')
-rw-r--r-- | res/stasis/app.c | 205 | ||||
-rw-r--r-- | res/stasis/app.h | 76 |
2 files changed, 276 insertions, 5 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c index bc1268fb7..aac9760b3 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_app.h" #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" +#include "asterisk/stasis_endpoints.h" #include "asterisk/stasis_message_router.h" struct app { @@ -52,6 +53,12 @@ struct app { char name[]; }; +enum forward_type { + FORWARD_CHANNEL, + FORWARD_BRIDGE, + FORWARD_ENDPOINT, +}; + /*! Subscription info for a particular channel/bridge. */ struct app_forwards { /*! Count of number of times this channel/bridge has been subscribed */ @@ -62,6 +69,8 @@ struct app_forwards { /*! Forward for the caching topic */ struct stasis_forward *topic_cached_forward; + /* Type of object being forwarded */ + enum forward_type forward_type; /*! Unique id of the object being forwarded */ char id[]; }; @@ -119,6 +128,7 @@ static struct app_forwards *forwards_create_channel(struct app *app, return NULL; } + forwards->forward_type = FORWARD_CHANNEL; forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), app->topic); if (!forwards->topic_forward) { @@ -153,6 +163,7 @@ static struct app_forwards *forwards_create_bridge(struct app *app, return NULL; } + forwards->forward_type = FORWARD_BRIDGE; forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic); if (!forwards->topic_forward) { @@ -172,6 +183,41 @@ static struct app_forwards *forwards_create_bridge(struct app *app, return forwards; } +/*! Forward a endpoint's topics to an app */ +static struct app_forwards *forwards_create_endpoint(struct app *app, + struct ast_endpoint *endpoint) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (!app || !endpoint) { + return NULL; + } + + forwards = forwards_create(app, ast_endpoint_get_id(endpoint)); + if (!forwards) { + return NULL; + } + + forwards->forward_type = FORWARD_ENDPOINT; + forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint), + app->topic); + if (!forwards->topic_forward) { + return NULL; + } + + forwards->topic_cached_forward = stasis_forward_all( + ast_endpoint_topic_cached(endpoint), app->topic); + if (!forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + stasis_forward_cancel(forwards->topic_forward); + forwards->topic_forward = NULL; + return NULL; + } + + ao2_ref(forwards, +1); + return forwards; +} + static int forwards_sort(const void *obj_left, const void *obj_right, int flags) { const struct app_forwards *object_left = obj_left; @@ -397,6 +443,47 @@ static void sub_channel_update_handler(void *data, } } +static struct ast_json *simple_endpoint_event( + const char *type, + struct ast_endpoint_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "endpoint", ast_endpoint_snapshot_to_json(snapshot)); +} + +static void sub_endpoint_update_handler(void *data, + struct stasis_subscription *sub, + struct stasis_message *message) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct app *app = data; + struct stasis_cache_update *update; + struct ast_endpoint_snapshot *new_snapshot; + const struct timeval *tv; + + ast_assert(stasis_message_type(message) == stasis_cache_update_type()); + + update = stasis_message_data(message); + + ast_assert(update->type == ast_endpoint_snapshot_type()); + + new_snapshot = stasis_message_data(update->new_snapshot); + tv = update->new_snapshot ? + stasis_message_timestamp(update->new_snapshot) : + stasis_message_timestamp(message); + + json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv); + + if (!json) { + return; + } + + app_send(app, json); +} + static struct ast_json *simple_bridge_event( const char *type, struct ast_bridge_snapshot *snapshot, @@ -526,6 +613,9 @@ struct app *app_create(const char *name, stasis_app_cb handler, void *data) res |= stasis_message_router_add_cache_update(app->router, ast_channel_snapshot_type(), sub_channel_update_handler, app); + res |= stasis_message_router_add_cache_update(app->router, + ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app); + res |= stasis_message_router_set_default(app->router, sub_default_handler, app); @@ -640,6 +730,56 @@ const char *app_name(const struct app *app) return app->name; } +struct ast_json *app_to_json(const struct app *app) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ast_json *channels; + struct ast_json *bridges; + struct ast_json *endpoints; + struct ao2_iterator i; + void *obj; + + json = ast_json_pack("{s: s, s: [], s: [], s: []}", + "name", app->name, + "channel_ids", "bridge_ids", "endpoint_ids"); + channels = ast_json_object_get(json, "channel_ids"); + bridges = ast_json_object_get(json, "bridge_ids"); + endpoints = ast_json_object_get(json, "endpoint_ids"); + + i = ao2_iterator_init(app->forwards, 0); + while ((obj = ao2_iterator_next(&i))) { + RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup); + RAII_VAR(struct ast_json *, id, NULL, ast_json_unref); + int append_res = -1; + + id = ast_json_string_create(forwards->id); + + switch (forwards->forward_type) { + case FORWARD_CHANNEL: + append_res = ast_json_array_append(channels, + ast_json_ref(id)); + break; + case FORWARD_BRIDGE: + append_res = ast_json_array_append(bridges, + ast_json_ref(id)); + break; + case FORWARD_ENDPOINT: + append_res = ast_json_array_append(endpoints, + ast_json_ref(id)); + break; + } + + if (append_res != 0) { + ast_log(LOG_ERROR, "Error building response\n"); + ao2_iterator_destroy(&i); + return NULL; + } + } + ao2_iterator_destroy(&i); + + return ast_json_ref(json); +} + int app_subscribe_channel(struct app *app, struct ast_channel *chan) { int res; @@ -678,8 +818,8 @@ static int unsubscribe(struct app *app, const char *kind, const char *id) forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (!forwards) { - ast_log(LOG_ERROR, - "App '%s' not subscribed to %s '%s'", + ast_log(LOG_WARNING, + "App '%s' not subscribed to %s '%s'\n", app->name, kind, id); return -1; } @@ -701,7 +841,23 @@ int app_unsubscribe_channel(struct app *app, struct ast_channel *chan) return -1; } - return unsubscribe(app, "channel", ast_channel_uniqueid(chan)); + return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan)); +} + +int app_unsubscribe_channel_id(struct app *app, const char *channel_id) +{ + if (!app || !channel_id) { + return -1; + } + + return unsubscribe(app, "channel", channel_id); +} + +int app_is_subscribed_channel_id(struct app *app, const char *channel_id) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY); + return forwards != NULL; } int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge) @@ -735,5 +891,46 @@ int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge) return -1; } - return unsubscribe(app, "bridge", bridge->uniqueid); + return app_unsubscribe_bridge_id(app, bridge->uniqueid); +} + +int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id) +{ + if (!app || !bridge_id) { + return -1; + } + + return unsubscribe(app, "bridge", bridge_id); +} + +int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY); + return forwards != NULL; +} + +int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint) +{ + if (!app || !endpoint) { + return -1; + } else { + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint), + OBJ_SEARCH_KEY | OBJ_NOLOCK); + + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_endpoint(app, endpoint); + if (!forwards) { + return -1; + } + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); + } + + ++forwards->interested; + return 0; + } } diff --git a/res/stasis/app.h b/res/stasis/app.h index 5f9f1d7e7..4db9db97d 100644 --- a/res/stasis/app.h +++ b/res/stasis/app.h @@ -114,6 +114,8 @@ void app_send(struct app *app, struct ast_json *message); struct app_forwards; +struct ast_json *app_to_json(const struct app *app); + /*! * \brief Subscribes an application to a channel. * @@ -128,11 +130,33 @@ int app_subscribe_channel(struct app *app, struct ast_channel *chan); * \brief Cancel the subscription an app has for a channel. * * \param app Subscribing application. - * \param forwards Returned object from app_subscribe_channel(). + * \param chan Channel to unsubscribe from. + * \return 0 on success. + * \return Non-zero on error. */ int app_unsubscribe_channel(struct app *app, struct ast_channel *chan); /*! + * \brief Cancel the subscription an app has for a channel. + * + * \param app Subscribing application. + * \param channel_id Id of channel to unsubscribe from. + * \return 0 on success. + * \return Non-zero on error. + */ +int app_unsubscribe_channel_id(struct app *app, const char *channel_id); + +/*! + * \brief Test if an app is subscribed to a channel. + * + * \param app Subscribing application. + * \param channel_id Id of channel to check. + * \return True (non-zero) if channel is subscribed to \a app. + * \return False (zero) if channel is not subscribed. + */ +int app_is_subscribed_channel_id(struct app *app, const char *channel_id); + +/*! * \brief Add a bridge subscription to an existing channel subscription. * * \param app Application. @@ -152,4 +176,54 @@ int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge); */ int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge); +/*! + * \brief Cancel the subscription an app has for a bridge. + * + * \param app Subscribing application. + * \param bridge_id Id of bridge to unsubscribe from. + * \return 0 on success. + * \return Non-zero on error. + */ +int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id); + +/*! + * \brief Test if an app is subscribed to a bridge. + * + * \param app Subscribing application. + * \param bridge_id Id of bridge to check. + * \return True (non-zero) if bridge is subscribed to \a app. + * \return False (zero) if bridge is not subscribed. + */ +int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id); + +/*! + * \brief Subscribes an application to a endpoint. + * + * \param app Application. + * \param chan Endpoint to subscribe to. + * \return 0 on success. + * \return Non-zero on error. + */ +int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint); + +/*! + * \brief Cancel the subscription an app has for a endpoint. + * + * \param app Subscribing application. + * \param endpoint_id Id of endpoint to unsubscribe from. + * \return 0 on success. + * \return Non-zero on error. + */ +int app_unsubscribe_endpoint_id(struct app *app, const char *endpoint_id); + +/*! + * \brief Test if an app is subscribed to a endpoint. + * + * \param app Subscribing application. + * \param endpoint_id Id of endpoint to check. + * \return True (non-zero) if endpoint is subscribed to \a app. + * \return False (zero) if endpoint is not subscribed. + */ +int app_is_subscribed_endpoint_id(struct app *app, const char *endpoint_id); + #endif /* _ASTERISK_RES_STASIS_APP_H */ |