diff options
author | Matthew Jordan <mjordan@digium.com> | 2014-07-22 16:20:58 +0000 |
---|---|---|
committer | Matthew Jordan <mjordan@digium.com> | 2014-07-22 16:20:58 +0000 |
commit | bb87796f67f8cef493a89e8b685e895142bfdce4 (patch) | |
tree | 533e63d1aa65281d3b13363790f88b1fc32a2574 /main/endpoints.c | |
parent | 84beaf27bc2605fa93d78ab2285aa945dbd08db0 (diff) |
ARI: Fix endpoint/channel subscription issues; allow for subscriptions to tech
This patch serves two purposes:
(1) It fixes some bugs with endpoint subscriptions not reporting all of the
channel events
(2) It serves as the preliminary work needed for ASTERISK-23692, which allows
for sending/receiving arbitrary out of call text messages through ARI in a
technology agnostic fashion.
The messaging functionality described on ASTERISK-23692 requires two things:
(1) The ability to send/receive messages associated with an endpoint. This is
relatively straight forwards with the endpoint core in Asterisk now.
(2) The ability to send/receive messages associated with a technology and an
arbitrary technology defined URI. This is less straight forward, as
endpoints are formed from a tech + resource pair. We don't have a
mechanism to note that a technology that *may* have endpoints exists.
This patch provides such a mechanism, and fixes a few bugs along the way.
The first major bug this patch fixes is the forwarding of channel messages
to their respective endpoints. Prior to this patch, there were two problems:
(1) Channel caching messages weren't forwarded. Thus, the endpoints missed
most of the interesting bits (such as channel creation, destruction, state
changes, etc.)
(2) Channels weren't associated with their endpoint until after creation.
This resulted in endpoints missing the channel creation message, which
limited the usefulness of the subscription in the first place (a major use
case being 'tell me when this endpoint has a channel'). Unfortunately,
this meant another parameter to ast_channel_alloc. Since not all channel
technologies support an ast_endpoint, this patch makes such a call
optional and opts for a new function, ast_channel_alloc_with_endpoint.
When endpoints are created, they will implicitly create a technology endpoint
for their technology (if one does not already exist). A technology endpoint is
special in that it has no state, cannot have channels created for it, cannot
be created explicitly, and cannot be destroyed except on shutdown. It does,
however, have all messages from other endpoints in its technology forwarded to
it.
Combined with the bug fixes, we now have Stasis messages being properly
forwarded. Consider the following scenario: two PJSIP endpoints (foo and bar),
where bar has a single channel associated with it and foo has two channels
associated with it. The messages would be forwarded as follows:
channel PJSIP/foo-1 --
\
--> endpoint PJSIP/foo --
/ \
channel PJSIP/foo-2 -- \
---- > endpoint PJSIP
/
channel PJSIP/bar-1 -----> endpoint PJSIP/bar --
ARI, through the applications resource, can:
- subscribe to endpoint:PJSIP/foo and get notifications for channels
PJSIP/foo-1,PJSIP/foo-2 and endpoint PJSIP/foo
- subscribe to endpoint:PJSIP/bar and get notifications for channels
PJSIP/bar-1 and endpoint PJSIP/bar
- subscribe to endpoint:PJSIP and get notifications for channels
PJSIP/foo-1,PJSIP/foo-2,PJSIP/bar-1 and endpoints PJSIP/foo,PJSIP/bar
Note that since endpoint PJSIP never changes, it never has events itself. It
merely provides an aggregation point for all other endpoints in its technology
(which in turn aggregate all channel messages associated with that endpoint).
This patch also adds endpoints to res_xmpp and chan_motif, because the actual
messaging work will need it (messaging without XMPP is just sad).
Review: https://reviewboard.asterisk.org/r/3760/
ASTERISK-23692
........
Merged revisions 419196 from http://svn.asterisk.org/svn/asterisk/branches/12
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@419203 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/endpoints.c')
-rw-r--r-- | main/endpoints.c | 110 |
1 files changed, 84 insertions, 26 deletions
diff --git a/main/endpoints.c b/main/endpoints.c index 10b32e268..985f6e634 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -46,8 +46,13 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") /*! Buckets for endpoint hash. Keep it prime! */ #define ENDPOINT_BUCKETS 127 +/*! Buckets for technology endpoints. */ +#define TECH_ENDPOINT_BUCKETS 11 + static struct ao2_container *endpoints; +static struct ao2_container *tech_endpoints; + struct ast_endpoint { AST_DECLARE_STRING_FIELDS( AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */ @@ -69,6 +74,8 @@ struct ast_endpoint { struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; + /*! Forwarding subscription from an endpoint to its tech endpoint */ + struct stasis_forward *tech_forward; }; static int endpoint_hash(const void *obj, int flags) @@ -121,7 +128,13 @@ static int endpoint_cmp(void *obj, void *arg, int flags) struct ast_endpoint *ast_endpoint_find_by_id(const char *id) { - return ao2_find(endpoints, id, OBJ_KEY); + struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY); + + if (!endpoint) { + endpoint = ao2_find(tech_endpoints, id, OBJ_KEY); + } + + return endpoint; } struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) @@ -181,6 +194,8 @@ static void endpoint_dtor(void *obj) ao2_cleanup(endpoint->router); endpoint->router = NULL; + endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward); + stasis_cp_single_unsubscribe(endpoint->topics); endpoint->topics = NULL; @@ -196,6 +211,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint, { ast_assert(chan != NULL); ast_assert(endpoint != NULL); + ast_assert(!ast_strlen_zero(endpoint->resource)); ast_channel_forward_endpoint(chan, endpoint); @@ -242,19 +258,21 @@ static void endpoint_default(void *data, } } -struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) +static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource) { RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); + RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup); int r = 0; - if (ast_strlen_zero(tech)) { - ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n"); - return NULL; - } - - if (ast_strlen_zero(resource)) { - ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n"); - return NULL; + /* Get/create the technology endpoint */ + if (!ast_strlen_zero(resource)) { + tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY); + if (!tech_endpoint) { + tech_endpoint = endpoint_internal_create(tech, NULL); + if (!tech_endpoint) { + return NULL; + } + } } endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor); @@ -268,10 +286,12 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) if (ast_string_field_init(endpoint, 80) != 0) { return NULL; } - ast_string_field_set(endpoint, tech, tech); - ast_string_field_set(endpoint, resource, resource); - ast_string_field_build(endpoint, id, "%s/%s", tech, resource); + ast_string_field_set(endpoint, resource, S_OR(resource, "")); + ast_string_field_build(endpoint, id, "%s%s%s", + tech, + !ast_strlen_zero(resource) ? "/" : "", + S_OR(resource, "")); /* All access to channel_ids should be covered by the endpoint's * lock; no extra lock needed. */ @@ -287,24 +307,47 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) return NULL; } - endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); - if (!endpoint->router) { - return NULL; - } - r |= stasis_message_router_add(endpoint->router, - stasis_cache_clear_type(), endpoint_cache_clear, - endpoint); - r |= stasis_message_router_set_default(endpoint->router, - endpoint_default, endpoint); - - endpoint_publish_snapshot(endpoint); + if (!ast_strlen_zero(resource)) { + endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); + if (!endpoint->router) { + return NULL; + } + r |= stasis_message_router_add(endpoint->router, + stasis_cache_clear_type(), endpoint_cache_clear, + endpoint); + r |= stasis_message_router_set_default(endpoint->router, + endpoint_default, endpoint); + if (r) { + return NULL; + } - ao2_link(endpoints, endpoint); + endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics), + stasis_cp_single_topic(tech_endpoint->topics)); + endpoint_publish_snapshot(endpoint); + ao2_link(endpoints, endpoint); + } else { + ao2_link(tech_endpoints, endpoint); + } ao2_ref(endpoint, +1); return endpoint; } +struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) +{ + if (ast_strlen_zero(tech)) { + ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n"); + return NULL; + } + + if (ast_strlen_zero(resource)) { + ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n"); + return NULL; + } + + return endpoint_internal_create(tech, resource); +} + static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint) { RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); @@ -368,6 +411,8 @@ void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state) { ast_assert(endpoint != NULL); + ast_assert(!ast_strlen_zero(endpoint->resource)); + ao2_lock(endpoint); endpoint->state = state; ao2_unlock(endpoint); @@ -378,6 +423,8 @@ void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, int max_channels) { ast_assert(endpoint != NULL); + ast_assert(!ast_strlen_zero(endpoint->resource)); + ao2_lock(endpoint); endpoint->max_channels = max_channels; ao2_unlock(endpoint); @@ -407,6 +454,9 @@ struct ast_endpoint_snapshot *ast_endpoint_snapshot_create( void *obj; SCOPED_AO2LOCK(lock, endpoint); + ast_assert(endpoint != NULL); + ast_assert(!ast_strlen_zero(endpoint->resource)); + channel_count = ao2_container_count(endpoint->channel_ids); snapshot = ao2_alloc( @@ -440,6 +490,9 @@ static void endpoint_cleanup(void) { ao2_cleanup(endpoints); endpoints = NULL; + + ao2_cleanup(tech_endpoints); + tech_endpoints = NULL; } int ast_endpoint_init(void) @@ -448,10 +501,15 @@ int ast_endpoint_init(void) endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash, endpoint_cmp); - if (!endpoints) { return -1; } + tech_endpoints = ao2_container_alloc(TECH_ENDPOINT_BUCKETS, endpoint_hash, + endpoint_cmp); + if (!tech_endpoints) { + return -1; + } + return 0; } |