diff options
Diffstat (limited to 'res/res_stasis.c')
-rw-r--r-- | res/res_stasis.c | 437 |
1 files changed, 345 insertions, 92 deletions
diff --git a/res/res_stasis.c b/res/res_stasis.c index 3d003e40a..a1910abea 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -39,6 +39,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stasis_app.h" #include "asterisk/stasis_channels.h" #include "asterisk/strings.h" +#include "asterisk/stasis_message_router.h" +#include "asterisk/callerid.h" +#include "stasis_http/resource_events.h" /*! Time to wait for a frame in the application */ #define MAX_WAIT_MS 200 @@ -56,6 +59,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #define CONTROLS_NUM_BUCKETS 127 /*! + * \brief Number of buckets for the channels container for app instances. Remember + * to keep it a prime number! + */ +#define APP_CHANNELS_BUCKETS 7 + +/*! + * \brief Number of buckets for the blob_handlers container. Remember to keep + * it a prime number! + */ +#define BLOB_HANDLER_BUCKETS 7 + +/*! * \brief Stasis application container. Please call apps_registry() instead of * directly accessing. */ @@ -63,6 +78,9 @@ struct ao2_container *__apps_registry; struct ao2_container *__app_controls; +/*! \brief Message router for the channel caching topic */ +struct stasis_message_router *channel_router; + /*! Ref-counting accessor for the stasis applications container */ static struct ao2_container *apps_registry(void) { @@ -81,6 +99,8 @@ struct app { stasis_app_cb handler; /*! Opaque data to hand to callback function. */ void *data; + /*! List of channel identifiers this app instance is interested in */ + struct ao2_container *channels; /*! Name of the Stasis application */ char name[]; }; @@ -91,12 +111,14 @@ static void app_dtor(void *obj) ao2_cleanup(app->data); app->data = NULL; + ao2_cleanup(app->channels); + app->channels = NULL; } /*! Constructor for \ref app. */ static struct app *app_create(const char *name, stasis_app_cb handler, void *data) { - struct app *app; + RAII_VAR(struct app *, app, NULL, ao2_cleanup); size_t size; ast_assert(name != NULL); @@ -114,6 +136,12 @@ static struct app *app_create(const char *name, stasis_app_cb handler, void *dat ao2_ref(data, +1); app->data = data; + app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS); + if (!app->channels) { + return NULL; + } + + ao2_ref(app, +1); return app; } @@ -140,6 +168,27 @@ static int app_compare(void *lhs, void *rhs, int flags) } } +static int app_add_channel(struct app* app, const struct ast_channel *chan) +{ + const char *uniqueid; + ast_assert(chan != NULL); + ast_assert(app != NULL); + + uniqueid = ast_channel_uniqueid(chan); + if (!ast_str_container_add(app->channels, uniqueid)) { + return -1; + } + return 0; +} + +static void app_remove_channel(struct app* app, const struct ast_channel *chan) +{ + ast_assert(chan != NULL); + ast_assert(app != NULL); + + ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK); +} + /*! * \brief Send a message to the given application. * \param app App to send the message to. @@ -316,6 +365,9 @@ void stasis_app_control_continue(struct stasis_app_control *control) control->continue_to_dialplan = 1; } +/*! \brief Typedef for blob handler callbacks */ +typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *); + static int OK = 0; static int FAIL = -1; @@ -343,43 +395,11 @@ int stasis_app_control_answer(struct stasis_app_control *control) return *retval; } -static struct ast_json *app_event_create( - const char *event_name, - const struct ast_channel_snapshot *snapshot, - const struct ast_json *extra_info) -{ - RAII_VAR(struct ast_json *, message, NULL, ast_json_unref); - RAII_VAR(struct ast_json *, event, NULL, ast_json_unref); - - if (extra_info) { - event = ast_json_deep_copy(extra_info); - } else { - event = ast_json_object_create(); - } - - if (snapshot) { - int ret; - - /* Mustn't already have a channel field */ - ast_assert(ast_json_object_get(event, "channel") == NULL); - - ret = ast_json_object_set( - event, - "channel", ast_channel_snapshot_to_json(snapshot)); - if (ret != 0) { - return NULL; - } - } - - message = ast_json_pack("{s: o}", event_name, ast_json_ref(event)); - - return ast_json_ref(message); -} - static int send_start_msg(struct app *app, struct ast_channel *chan, int argc, char *argv[]) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); + RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); struct ast_json *json_args; @@ -393,19 +413,13 @@ static int send_start_msg(struct app *app, struct ast_channel *chan, return -1; } - msg = ast_json_pack("{s: {s: [], s: o}}", - "stasis-start", - "args", - "channel", ast_channel_snapshot_to_json(snapshot)); - - if (!msg) { + blob = ast_json_pack("{s: []}", "args"); + if (!blob) { return -1; } /* Append arguments to args array */ - json_args = ast_json_object_get( - ast_json_object_get(msg, "stasis-start"), - "args"); + json_args = ast_json_object_get(blob, "args"); ast_assert(json_args != NULL); for (i = 0; i < argc; ++i) { int r = ast_json_array_append(json_args, @@ -416,6 +430,11 @@ static int send_start_msg(struct app *app, struct ast_channel *chan, } } + msg = stasis_json_event_stasis_start_create(snapshot, blob); + if (!msg) { + return -1; + } + app_send(app, msg); return 0; } @@ -432,7 +451,8 @@ static int send_end_msg(struct app *app, struct ast_channel *chan) if (snapshot == NULL) { return -1; } - msg = app_event_create("stasis-end", snapshot, NULL); + + msg = stasis_json_event_stasis_end_create(snapshot); if (!msg) { return -1; } @@ -441,62 +461,201 @@ static int send_end_msg(struct app *app, struct ast_channel *chan) return 0; } -static void dtmf_handler(struct app *app, struct ast_channel_blob *obj) +static int app_watching_channel_cb(void *obj, void *arg, int flags) { - RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref); - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - const char *direction; + RAII_VAR(char *, uniqueid, NULL, ao2_cleanup); + struct app *app = obj; + char *chan_uniqueid = arg; - /* To simplify events, we'll only generate on receive */ - direction = ast_json_string_get( - ast_json_object_get(obj->blob, "direction")); + uniqueid = ao2_find(app->channels, chan_uniqueid, OBJ_KEY); + return uniqueid ? CMP_MATCH : 0; +} - if (strcmp("Received", direction) != 0) { - return; +static struct ao2_container *get_watching_apps(const char *uniqueid) +{ + RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup); + struct ao2_container *watching_apps; + char *uniqueid_dup; + RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy); + ast_assert(uniqueid != NULL); + ast_assert(apps != NULL); + + uniqueid_dup = ast_strdupa(uniqueid); + + watching_apps_iter = ao2_callback(apps, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup); + watching_apps = watching_apps_iter->c; + + if (!ao2_container_count(watching_apps)) { + return NULL; } - extra = ast_json_pack( - "{s: o}", - "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit"))); - if (!extra) { - return; + ao2_ref(watching_apps, +1); + return watching_apps_iter->c; +} + +/*! \brief Typedef for callbacks that get called on channel snapshot updates */ +typedef struct ast_json *(*channel_snapshot_monitor)( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot); + +/*! \brief Handle channel state changes */ +static struct ast_json *channel_state( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot; + + if (!old_snapshot) { + return stasis_json_event_channel_created_create(snapshot); + } else if (!new_snapshot) { + json = ast_json_pack("{s: i, s: s}", + "cause", snapshot->hangupcause, + "cause_txt", ast_cause2str(snapshot->hangupcause)); + if (!json) { + return NULL; + } + return stasis_json_event_channel_destroyed_create(snapshot, json); + } else if (old_snapshot->state != new_snapshot->state) { + return stasis_json_event_channel_state_change_create(snapshot); } - msg = app_event_create("dtmf-received", obj->snapshot, extra); - if (!msg) { - return; + return NULL; +} + +static struct ast_json *channel_dialplan( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + + /* No Newexten event on cache clear */ + if (!new_snapshot) { + return NULL; + } + + /* Empty application is not valid for a Newexten event */ + if (ast_strlen_zero(new_snapshot->appl)) { + return NULL; + } + + if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) { + return NULL; + } + + json = ast_json_pack("{s: s, s: s}", + "application", new_snapshot->appl, + "application_data", new_snapshot->data); + if (!json) { + return NULL; + } + + return stasis_json_event_channel_dialplan_create(new_snapshot, json); +} + +static struct ast_json *channel_callerid( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + + /* No NewCallerid event on cache clear or first event */ + if (!old_snapshot || !new_snapshot) { + return NULL; } + if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) { + return NULL; + } + + json = ast_json_pack("{s: i, s: s}", + "caller_presentation", new_snapshot->caller_pres, + "caller_presentation_txt", ast_describe_caller_presentation(new_snapshot->caller_pres)); + if (!json) { + return NULL; + } + + return stasis_json_event_channel_caller_id_create(new_snapshot, json); +} + +static struct ast_json *channel_snapshot( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot) +{ + if (!new_snapshot) { + return NULL; + } + + return stasis_json_event_channel_snapshot_create(new_snapshot); +} + +channel_snapshot_monitor channel_monitors[] = { + channel_snapshot, + channel_state, + channel_dialplan, + channel_callerid +}; + +static int app_send_cb(void *obj, void *arg, int flags) +{ + struct app *app = obj; + struct ast_json *msg = arg; + app_send(app, msg); + return 0; } -static void sub_handler(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) +static void sub_snapshot_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) { - struct app *app = data; + RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup); + struct stasis_cache_update *update = stasis_message_data(message); + struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot); + struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot); + int i; - if (stasis_subscription_final_message(sub, message)) { - ao2_cleanup(data); + watching_apps = get_watching_apps(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid); + if (!watching_apps) { return; } - if (ast_channel_snapshot_type() == stasis_message_type(message)) { + for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - struct ast_channel_snapshot *snapshot = - stasis_message_data(message); - msg = app_event_create("channel-state-change", snapshot, NULL); - if (!msg) { - return; + msg = channel_monitors[i](old_snapshot, new_snapshot); + if (msg) { + ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg); } - app_send(app, msg); - } else if (ast_channel_dtmf_end_type() == stasis_message_type(message)) { - /* To simplify events, we'll only generate on DTMF end */ - struct ast_channel_blob *blob = stasis_message_data(message); - dtmf_handler(app, blob); } +} +static void distribute_message(struct ao2_container *apps, struct ast_json *msg) +{ + ao2_callback(apps, OBJ_NODATA, app_send_cb, msg); +} + +static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_handler_cb handler_cb) +{ + RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); + RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup); + + if (!obj->snapshot) { + return; + } + + watching_apps = get_watching_apps(obj->snapshot->uniqueid); + if (!watching_apps) { + return; + } + + msg = handler_cb(obj); + if (!msg) { + return; + } + + distribute_message(watching_apps, msg); } /*! @@ -544,8 +703,6 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup); RAII_VAR(struct app *, app, NULL, ao2_cleanup); RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink); - RAII_VAR(struct stasis_subscription *, subscription, NULL, - stasis_unsubscribe); int res = 0; int hungup = 0; @@ -570,21 +727,17 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, ao2_link(controls, control); } - subscription = - stasis_subscribe(ast_channel_topic(chan), sub_handler, app); - if (subscription == NULL) { - ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n", - app_name, ast_channel_name(chan)); - return -1; - } - ao2_ref(app, +1); /* subscription now has a reference */ - res = send_start_msg(app, chan, argc, argv); if (res != 0) { ast_log(LOG_ERROR, "Error sending start message to %s\n", app_name); return res; } + if (app_add_channel(app, chan)) { + ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name); + return -1; + } + while (1) { RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor); int r; @@ -634,6 +787,7 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, } } + app_remove_channel(app, chan); res = send_end_msg(app, chan); if (res != 0) { ast_log(LOG_ERROR, @@ -675,10 +829,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) if (app) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); + RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); SCOPED_LOCK(app_lock, app, ao2_lock, ao2_unlock); - msg = app_event_create("application-replaced", NULL, NULL); - app->handler(app->data, app_name, msg); + blob = ast_json_pack("{s: s}", "application", app_name); + if (blob) { + msg = stasis_json_event_application_replaced_create(blob); + if (msg) { + app->handler(app->data, app_name, msg); + } + } app->handler = handler; ao2_cleanup(app->data); @@ -706,6 +866,82 @@ void stasis_app_unregister(const char *app_name) } } +static struct ast_json *handle_blob_dtmf(struct ast_channel_blob *obj) +{ + RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref); + RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); + const char *direction; + + /* To simplify events, we'll only generate on receive */ + direction = ast_json_string_get( + ast_json_object_get(obj->blob, "direction")); + + if (strcmp("Received", direction) != 0) { + return NULL; + } + + extra = ast_json_pack( + "{s: o}", + "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit"))); + if (!extra) { + return NULL; + } + + return stasis_json_event_channel_dtmf_received_create(obj->snapshot, extra); +} + +/* To simplify events, we'll only generate on DTMF end (dtmf_end type) */ +static void sub_dtmf_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_channel_blob *obj = stasis_message_data(message); + generic_blob_handler(obj, handle_blob_dtmf); +} + +static struct ast_json *handle_blob_userevent(struct ast_channel_blob *obj) +{ + return stasis_json_event_channel_userevent_create(obj->snapshot, obj->blob); +} + +static void sub_userevent_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_channel_blob *obj = stasis_message_data(message); + generic_blob_handler(obj, handle_blob_userevent); +} + +static struct ast_json *handle_blob_hangup_request(struct ast_channel_blob *obj) +{ + return stasis_json_event_channel_hangup_request_create(obj->snapshot, obj->blob); +} + +static void sub_hangup_request_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_channel_blob *obj = stasis_message_data(message); + generic_blob_handler(obj, handle_blob_hangup_request); +} + +static struct ast_json *handle_blob_varset(struct ast_channel_blob *obj) +{ + return stasis_json_event_channel_varset_create(obj->snapshot, obj->blob); +} + +static void sub_varset_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_channel_blob *obj = stasis_message_data(message); + generic_blob_handler(obj, handle_blob_varset); +} + static int load_module(void) { int r = 0; @@ -722,13 +958,30 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - return r; + channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached())); + if (!channel_router) { + return AST_MODULE_LOAD_FAILURE; + } + + r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL); + r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_userevent_handler, NULL); + r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_varset_handler, NULL); + r |= stasis_message_router_add(channel_router, ast_channel_dtmf_begin_type(), sub_dtmf_handler, NULL); + r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_hangup_request_handler, NULL); + if (r) { + return AST_MODULE_LOAD_FAILURE; + } + + return AST_MODULE_LOAD_SUCCESS; } static int unload_module(void) { int r = 0; + stasis_message_router_unsubscribe(channel_router); + channel_router = NULL; + ao2_cleanup(__apps_registry); __apps_registry = NULL; |