diff options
Diffstat (limited to 'res/stasis/app.c')
-rw-r--r-- | res/stasis/app.c | 639 |
1 files changed, 560 insertions, 79 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c index 6f80ed64a..8abe0c19c 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -29,132 +29,519 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "app.h" +#include "asterisk/callerid.h" #include "asterisk/stasis_app.h" +#include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" - -/*! - * \brief Number of buckets for the channels container for app instances. Remember - * to keep it a prime number! - */ -#define APP_CHANNELS_BUCKETS 7 - -/*! - * \brief Number of buckets for the bridges container for app instances. Remember - * to keep it a prime number! - */ -#define APP_BRIDGES_BUCKETS 7 +#include "asterisk/stasis_message_router.h" struct app { + /*! Aggregation topic for this application. */ + struct stasis_topic *topic; + /*! Router for handling messages forwarded to \a topic. */ + struct stasis_message_router *router; + /*! Subscription to watch for bridge merge messages */ + struct stasis_subscription *bridge_merge_sub; + /*! Container of the channel forwards to this app's topic. */ + struct ao2_container *forwards; /*! Callback function for this application. */ stasis_app_cb handler; /*! Opaque data to hand to callback function. */ void *data; - /*! List of channel identifiers this app instance is interested in */ - struct ao2_container *channels; - /*! List of bridge identifiers this app instance owns */ - struct ao2_container *bridges; /*! Name of the Stasis application */ char name[]; }; +/*! Subscription info for a particular channel/bridge. */ +struct app_forwards { + /*! Count of number of times this channel/bridge has been subscribed */ + int interested; + + /*! Forward for the regular topic */ + struct stasis_subscription *topic_forward; + /*! Forward for the caching topic */ + struct stasis_subscription *topic_cached_forward; + + /*! Unique id of the object being forwarded */ + char id[]; +}; + +static void forwards_dtor(void *obj) +{ + struct app_forwards *forwards = obj; + + ast_assert(forwards->topic_forward == NULL); + ast_assert(forwards->topic_cached_forward == NULL); +} + +static void forwards_unsubscribe(struct app_forwards *forwards) +{ + stasis_unsubscribe(forwards->topic_forward); + forwards->topic_forward = NULL; + stasis_unsubscribe(forwards->topic_cached_forward); + forwards->topic_cached_forward = NULL; +} + +static struct app_forwards *forwards_create(struct app *app, + const char *id) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (!app || ast_strlen_zero(id)) { + return NULL; + } + + forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor); + if (!forwards) { + return NULL; + } + + strcpy(forwards->id, id); + + ao2_ref(forwards, +1); + return forwards; +} + +/*! Forward a channel's topics to an app */ +static struct app_forwards *forwards_create_channel(struct app *app, + struct ast_channel *chan) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (!app || !chan) { + return NULL; + } + + forwards = forwards_create(app, ast_channel_uniqueid(chan)); + if (!forwards) { + return NULL; + } + + forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), + app->topic); + if (!forwards->topic_forward) { + return NULL; + } + + forwards->topic_cached_forward = stasis_forward_all( + ast_channel_topic_cached(chan), app->topic); + if (!forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + stasis_unsubscribe(forwards->topic_forward); + forwards->topic_forward = NULL; + return NULL; + } + + ao2_ref(forwards, +1); + return forwards; +} + +/*! Forward a bridge's topics to an app */ +static struct app_forwards *forwards_create_bridge(struct app *app, + struct ast_bridge *bridge) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (!app || !bridge) { + return NULL; + } + + forwards = forwards_create(app, bridge->uniqueid); + if (!forwards) { + return NULL; + } + + forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), + app->topic); + if (!forwards->topic_forward) { + return NULL; + } + + forwards->topic_cached_forward = stasis_forward_all( + ast_bridge_topic_cached(bridge), app->topic); + if (!forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + stasis_unsubscribe(forwards->topic_forward); + forwards->topic_forward = NULL; + return NULL; + } + + ao2_ref(forwards, +1); + return forwards; +} + +static int forwards_sort(const void *obj_left, const void *obj_right, int flags) +{ + const struct app_forwards *object_left = obj_left; + const struct app_forwards *object_right = obj_right; + const char *right_key = obj_right; + int cmp; + + switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) { + case OBJ_POINTER: + right_key = object_right->id; + /* Fall through */ + case OBJ_KEY: + cmp = strcmp(object_left->id, right_key); + break; + case OBJ_PARTIAL_KEY: + /* + * We could also use a partial key struct containing a length + * so strlen() does not get called for every comparison instead. + */ + cmp = strncmp(object_left->id, right_key, strlen(right_key)); + break; + default: + /* Sort can only work on something with a full or partial key. */ + ast_assert(0); + cmp = 0; + break; + } + return cmp; +} + static void app_dtor(void *obj) { struct app *app = obj; + ast_verb(1, "Destroying Stasis app %s\n", app->name); + + ast_assert(app->router == NULL); + ast_assert(app->bridge_merge_sub == NULL); + + ao2_cleanup(app->topic); + app->topic = NULL; + ao2_cleanup(app->forwards); + app->forwards = NULL; ao2_cleanup(app->data); app->data = NULL; - ao2_cleanup(app->channels); - app->channels = NULL; - ao2_cleanup(app->bridges); - app->bridges = NULL; } -struct app *app_create(const char *name, stasis_app_cb handler, void *data) +static void sub_default_handler(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) { - RAII_VAR(struct app *, app, NULL, ao2_cleanup); - size_t size; + struct app *app = data; + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); - ast_assert(name != NULL); - ast_assert(handler != NULL); + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(app); + } - ast_verb(1, "Creating Stasis app '%s'\n", name); + /* By default, send any message that has a JSON representation */ + json = stasis_message_to_json(message); + if (!json) { + return; + } - size = sizeof(*app) + strlen(name) + 1; - app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX); + app_send(app, json); +} - if (!app) { - return NULL; +/*! \brief Typedef for callbacks that get called on channel snapshot updates */ +typedef struct ast_json *(*channel_snapshot_monitor)( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv); + +static struct ast_json *simple_channel_event( + const char *type, + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "channel", ast_channel_snapshot_to_json(snapshot)); +} + +static struct ast_json *channel_created_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return simple_channel_event("ChannelCreated", snapshot, tv); +} + +static struct ast_json *channel_destroyed_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", + "type", "ChannelDestroyed", + "timestamp", ast_json_timeval(*tv, NULL), + "cause", snapshot->hangupcause, + "cause_txt", ast_cause2str(snapshot->hangupcause), + "channel", ast_channel_snapshot_to_json(snapshot)); +} + +static struct ast_json *channel_state_change_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return simple_channel_event("ChannelStateChange", snapshot, tv); +} + +/*! \brief Handle channel state changes */ +static struct ast_json *channel_state( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) +{ + struct ast_channel_snapshot *snapshot = new_snapshot ? + new_snapshot : old_snapshot; + + if (!old_snapshot) { + return channel_created_event(snapshot, tv); + } else if (!new_snapshot) { + return channel_destroyed_event(snapshot, tv); + } else if (old_snapshot->state != new_snapshot->state) { + return channel_state_change_event(snapshot, tv); } - strncpy(app->name, name, size - sizeof(*app)); - app->handler = handler; - ao2_ref(data, +1); - app->data = data; + return NULL; +} - app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS); - if (!app->channels) { +static struct ast_json *channel_dialplan( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + + /* No Newexten event on cache clear or first event */ + if (!old_snapshot || !new_snapshot) { return NULL; } - app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS); - if (!app->bridges) { + /* Empty application is not valid for a Newexten event */ + if (ast_strlen_zero(new_snapshot->appl)) { return NULL; } - ao2_ref(app, +1); - return app; + if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) { + return NULL; + } + + return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}", + "type", "ChannelDialplan", + "timestamp", ast_json_timeval(*tv, NULL), + "dialplan_app", new_snapshot->appl, + "dialplan_app_data", new_snapshot->data, + "channel", ast_channel_snapshot_to_json(new_snapshot)); } -int app_add_channel(struct app *app, const struct ast_channel *chan) +static struct ast_json *channel_callerid( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) { - SCOPED_AO2LOCK(lock, app); - const char *uniqueid; + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); - ast_assert(app != NULL); - ast_assert(chan != NULL); + /* No NewCallerid event on cache clear or first event */ + if (!old_snapshot || !new_snapshot) { + return NULL; + } - /* Don't accept new channels in an inactive application */ - if (!app->handler) { - return -1; + if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) { + return NULL; } - uniqueid = ast_channel_uniqueid(chan); - return ast_str_container_add(app->channels, uniqueid) ? -1 : 0; + return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", + "type", "ChannelCallerId", + "timestamp", ast_json_timeval(*tv, NULL), + "caller_presentation", new_snapshot->caller_pres, + "caller_presentation_txt", ast_describe_caller_presentation( + new_snapshot->caller_pres), + "channel", ast_channel_snapshot_to_json(new_snapshot)); } -void app_remove_channel(struct app* app, const struct ast_channel *chan) +static channel_snapshot_monitor channel_monitors[] = { + channel_state, + channel_dialplan, + channel_callerid +}; + +static void sub_channel_update_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) { - SCOPED_AO2LOCK(lock, app); + struct app *app = data; + struct stasis_cache_update *update; + struct ast_channel_snapshot *new_snapshot; + struct ast_channel_snapshot *old_snapshot; + const struct timeval *tv; + int i; + + ast_assert(stasis_message_type(message) == stasis_cache_update_type()); + + update = stasis_message_data(message); + + ast_assert(update->type == ast_channel_snapshot_type()); + + new_snapshot = stasis_message_data(update->new_snapshot); + old_snapshot = stasis_message_data(update->old_snapshot); + + /* Pull timestamp from the new snapshot, or from the update message + * when there isn't one. */ + tv = update->new_snapshot ? + stasis_message_timestamp(update->new_snapshot) : + stasis_message_timestamp(message); - ast_assert(app != NULL); - ast_assert(chan != NULL); + for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { + RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK); + msg = channel_monitors[i](old_snapshot, new_snapshot, tv); + if (msg) { + app_send(app, msg); + } + } } -int app_add_bridge(struct app *app, const char *uniqueid) +static struct ast_json *simple_bridge_event( + const char *type, + struct ast_bridge_snapshot *snapshot, + const struct timeval *tv) { - SCOPED_AO2LOCK(lock, app); + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "bridge", ast_bridge_snapshot_to_json(snapshot)); +} - ast_assert(app != NULL); - ast_assert(uniqueid != NULL); +static void sub_bridge_update_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct app *app = data; + struct stasis_cache_update *update; + struct ast_bridge_snapshot *new_snapshot; + struct ast_bridge_snapshot *old_snapshot; + const struct timeval *tv; - /* Don't accept new bridges in an inactive application */ - if (!app->handler) { - return -1; + ast_assert(stasis_message_type(message) == stasis_cache_update_type()); + + update = stasis_message_data(message); + + ast_assert(update->type == ast_bridge_snapshot_type()); + + new_snapshot = stasis_message_data(update->new_snapshot); + old_snapshot = stasis_message_data(update->old_snapshot); + tv = update->new_snapshot ? + stasis_message_timestamp(update->new_snapshot) : + stasis_message_timestamp(message); + + if (!new_snapshot) { + json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv); + } else if (!old_snapshot) { + json = simple_bridge_event("BridgeCreated", old_snapshot, tv); + } + + if (!json) { + return; + } + + app_send(app, json); +} + +static void bridge_merge_handler(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) +{ + struct app *app = data; + struct ast_bridge_merge_message *merge; + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(app); } - return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0; + if (stasis_message_type(message) != ast_bridge_merge_message_type()) { + return; + } + + merge = stasis_message_data(message); + + /* Find out if we're subscribed to either bridge */ + forwards = ao2_find(app->forwards, merge->from->uniqueid, + OBJ_SEARCH_KEY); + if (!forwards) { + forwards = ao2_find(app->forwards, merge->to->uniqueid, + OBJ_SEARCH_KEY); + } + + if (!forwards) { + return; + } + + /* Forward the message to the app */ + stasis_forward_message(app->topic, topic, message); } -void app_remove_bridge(struct app* app, const char *uniqueid) +struct app *app_create(const char *name, stasis_app_cb handler, void *data) { - SCOPED_AO2LOCK(lock, app); + RAII_VAR(struct app *, app, NULL, ao2_cleanup); + size_t size; + int res = 0; + + ast_assert(name != NULL); + ast_assert(handler != NULL); + + ast_verb(1, "Creating Stasis app '%s'\n", name); + + size = sizeof(*app) + strlen(name) + 1; + app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX); + + if (!app) { + return NULL; + } + + app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX, + AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, + forwards_sort, NULL); + if (!app->forwards) { + return NULL; + } + + app->topic = stasis_topic_create(name); + if (!app->topic) { + return NULL; + } + + app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(), + bridge_merge_handler, app); + if (!app->bridge_merge_sub) { + return NULL; + } + /* Subscription holds a reference */ + ao2_ref(app, +1); + + app->router = stasis_message_router_create(app->topic); + if (!app->router) { + return NULL; + } + + res |= stasis_message_router_add_cache_update(app->router, + ast_bridge_snapshot_type(), sub_bridge_update_handler, app); + + res |= stasis_message_router_add_cache_update(app->router, + ast_channel_snapshot_type(), sub_channel_update_handler, app); + + res |= stasis_message_router_set_default(app->router, + sub_default_handler, app); - ast_assert(app != NULL); - ast_assert(uniqueid != NULL); + if (res != 0) { + return NULL; + } + /* Router holds a reference */ + ao2_ref(app, +1); + + strncpy(app->name, name, size - sizeof(*app)); + app->handler = handler; + ao2_ref(data, +1); + app->data = data; - ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE); + ao2_ref(app, +1); + return app; } /*! @@ -196,6 +583,18 @@ void app_deactivate(struct app *app) app->data = NULL; } +void app_shutdown(struct app *app) +{ + SCOPED_AO2LOCK(lock, app); + + ast_assert(app_is_finished(app)); + + stasis_message_router_unsubscribe(app->router); + app->router = NULL; + stasis_unsubscribe(app->bridge_merge_sub); + app->bridge_merge_sub = NULL; +} + int app_is_active(struct app *app) { SCOPED_AO2LOCK(lock, app); @@ -206,8 +605,7 @@ int app_is_finished(struct app *app) { SCOPED_AO2LOCK(lock, app); - return app->handler == NULL && - ao2_container_count(app->channels) == 0; + return app->handler == NULL && ao2_container_count(app->forwards) == 0; } void app_update(struct app *app, stasis_app_cb handler, void *data) @@ -229,7 +627,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data) ast_verb(1, "Activating Stasis app '%s'\n", app->name); } - app->handler = handler; ao2_cleanup(app->data); if (data) { @@ -243,16 +640,100 @@ const char *app_name(const struct app *app) return app->name; } -int app_is_watching_channel(struct app *app, const char *uniqueid) +int app_subscribe_channel(struct app *app, struct ast_channel *chan) { - RAII_VAR(char *, found, NULL, ao2_cleanup); - found = ao2_find(app->channels, uniqueid, OBJ_KEY); - return found != NULL; + int res; + + if (!app || !chan) { + return -1; + } else { + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan), + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_channel(app, chan); + if (!forwards) { + return -1; + } + + res = ao2_link_flags(app->forwards, forwards, + OBJ_NOLOCK); + if (!res) { + return -1; + } + } + + ++forwards->interested; + return 0; + } +} + +static int unsubscribe(struct app *app, const char *kind, const char *id) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + ast_log(LOG_ERROR, + "App '%s' not subscribed to %s '%s'", + app->name, kind, id); + return -1; + } + + if (--forwards->interested == 0) { + /* No one is interested any more; unsubscribe */ + forwards_unsubscribe(forwards); + ao2_find(app->forwards, forwards, + OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK | + OBJ_NODATA); + } + + return 0; } -int app_is_watching_bridge(struct app *app, const char *uniqueid) +int app_unsubscribe_channel(struct app *app, struct ast_channel *chan) { - RAII_VAR(char *, found, NULL, ao2_cleanup); - found = ao2_find(app->bridges, uniqueid, OBJ_KEY); - return found != NULL; + if (!app || !chan) { + return -1; + } + + return unsubscribe(app, "channel", ast_channel_uniqueid(chan)); +} + +int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge) +{ + if (!app || !bridge) { + return -1; + } else { + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, bridge->uniqueid, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_bridge(app, bridge); + if (!forwards) { + return -1; + } + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); + } + + ++forwards->interested; + return 0; + } +} + +int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge) +{ + if (!app || !bridge) { + return -1; + } + + return unsubscribe(app, "bridge", bridge->uniqueid); } |