diff options
author | Kinsey Moore <kmoore@digium.com> | 2013-03-16 15:45:58 +0000 |
---|---|---|
committer | Kinsey Moore <kmoore@digium.com> | 2013-03-16 15:45:58 +0000 |
commit | 99aa02d17f7f1815f9a2abf75282f815a975cd67 (patch) | |
tree | 5ebb9cf3ddecd7153afb9e30f767d170c44b2142 /res | |
parent | 5d45596f6257b86189bef2dfaf5d9cc0b001fa46 (diff) |
Transition MWI to Stasis-core
Remove MWI's dependency on the event system by moving it to
Stasis-core. This also introduces forwarding topic pools in Stasis-core
which aggregate many dynamically allocated topics into a single primary
topic.
Review: https://reviewboard.asterisk.org/r/2368/
(closes issue ASTERISK-21097)
Patch-by: Kinsey Moore
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@383284 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res')
-rw-r--r-- | res/res_jabber.c | 48 | ||||
-rw-r--r-- | res/res_xmpp.c | 42 |
2 files changed, 43 insertions, 47 deletions
diff --git a/res/res_jabber.c b/res/res_jabber.c index e8e79051e..1f9ddbaa3 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -373,7 +373,7 @@ static void aji_pubsub_purge_nodes(struct aji_client *client, static void aji_publish_mwi(struct aji_client *client, const char *mailbox, const char *context, const char *oldmsgs, const char *newmsgs); static void aji_devstate_cb(const struct ast_event *ast_event, void *data); -static void aji_mwi_cb(const struct ast_event *ast_event, void *data); +static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node, const char *event_type, unsigned int cachable); /* No transports in this version */ @@ -410,7 +410,7 @@ static char *app_ajileave = "JabberLeave"; static struct aji_client_container clients; static struct aji_capabilities *capabilities = NULL; -static struct ast_event_sub *mwi_sub = NULL; +static struct stasis_subscription *mwi_sub = NULL; static struct ast_event_sub *device_state_sub = NULL; static ast_cond_t message_received_condition; static ast_mutex_t messagelock; @@ -3240,30 +3240,33 @@ int ast_aji_disconnect(struct aji_client *client) * \param data void pointer to ast_client structure * \return void */ -static void aji_mwi_cb(const struct ast_event *ast_event, void *data) +static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { const char *mailbox; const char *context; char oldmsgs[10]; char newmsgs[10]; - struct aji_client *client; - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) - { + struct aji_client *client = data; + struct stasis_mwi_state *mwi_state; + + if (!stasis_subscription_is_subscribed(sub) || stasis_mwi_state() != stasis_message_type(msg)) { + return; + } + + mwi_state = stasis_message_data(msg); + + if (ast_eid_cmp(&ast_eid_default, &mwi_state->eid)) { /* If the event didn't originate from this server, don't send it back out. */ - ast_debug(1, "Returning here\n"); return; } - client = ASTOBJ_REF((struct aji_client *) data); - mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX); - context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT); + mailbox = mwi_state->mailbox; + context = mwi_state->context; snprintf(oldmsgs, sizeof(oldmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS)); + mwi_state->old_msgs); snprintf(newmsgs, sizeof(newmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS)); + mwi_state->new_msgs); aji_publish_mwi(client, mailbox, context, oldmsgs, newmsgs); - ASTOBJ_UNREF(client, ast_aji_client_destroy); - } /*! * \brief Callback function for device state events @@ -3300,8 +3303,7 @@ static void aji_devstate_cb(const struct ast_event *ast_event, void *data) static void aji_init_event_distribution(struct aji_client *client) { if (!mwi_sub) { - mwi_sub = ast_event_subscribe(AST_EVENT_MWI, aji_mwi_cb, "aji_mwi_subscription", - client, AST_EVENT_IE_END); + mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), aji_mwi_cb, client); } if (!device_state_sub) { if (ast_enable_distributed_devstate()) { @@ -3364,14 +3366,10 @@ static int aji_handle_pubsub_event(void *data, ikspak *pak) context = strsep(&item_id, "@"); sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs); sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs); - if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, - AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT, - AST_EVENT_IE_PLTYPE_STR, context, AST_EVENT_IE_OLDMSGS, - AST_EVENT_IE_PLTYPE_UINT, oldmsgs, AST_EVENT_IE_NEWMSGS, - AST_EVENT_IE_PLTYPE_UINT, newmsgs, AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, - &pubsub_eid, sizeof(pubsub_eid), AST_EVENT_IE_END))) { - return IKS_FILTER_EAT; - } + + stasis_publish_mwi_state_full(item_id, context, newmsgs, oldmsgs, &pubsub_eid); + + return IKS_FILTER_EAT; } else { ast_debug(1, "Don't know how to handle PubSub event of type %s\n", iks_name(item_content)); @@ -4771,7 +4769,7 @@ static int unload_module(void) ast_manager_unregister("JabberSend"); ast_custom_function_unregister(&jabberstatus_function); if (mwi_sub) { - ast_event_unsubscribe(mwi_sub); + mwi_sub = stasis_unsubscribe(mwi_sub); } if (device_state_sub) { ast_event_unsubscribe(device_state_sub); diff --git a/res/res_xmpp.c b/res/res_xmpp.c index f2f200c9a..1901aa25b 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1319,24 +1319,30 @@ static void xmpp_pubsub_publish_device_state(struct ast_xmpp_client *client, con * \param data void pointer to ast_client structure * \return void */ -static void xmpp_pubsub_mwi_cb(const struct ast_event *ast_event, void *data) +static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { struct ast_xmpp_client *client = data; const char *mailbox, *context; char oldmsgs[10], newmsgs[10]; + struct stasis_mwi_state *mwi_state; - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) { + if (!stasis_subscription_is_subscribed(sub) || stasis_mwi_state_message() != stasis_message_type(msg)) { + return; + } + + mwi_state = stasis_message_data(msg); + + if (ast_eid_cmp(&ast_eid_default, &mwi_state->eid)) { /* If the event didn't originate from this server, don't send it back out. */ - ast_debug(1, "Returning here\n"); return; } - mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX); - context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT); + mailbox = mwi_state->mailbox; + context = mwi_state->context; snprintf(oldmsgs, sizeof(oldmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS)); + mwi_state->old_msgs); snprintf(newmsgs, sizeof(newmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS)); + mwi_state->new_msgs); xmpp_pubsub_publish_mwi(client, mailbox, context, oldmsgs, newmsgs); } @@ -1479,14 +1485,10 @@ static int xmpp_pubsub_handle_event(void *data, ikspak *pak) context = strsep(&item_id, "@"); sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs); sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs); - if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, - AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT, - AST_EVENT_IE_PLTYPE_STR, context, AST_EVENT_IE_OLDMSGS, - AST_EVENT_IE_PLTYPE_UINT, oldmsgs, AST_EVENT_IE_NEWMSGS, - AST_EVENT_IE_PLTYPE_UINT, newmsgs, AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, - &pubsub_eid, sizeof(pubsub_eid), AST_EVENT_IE_END))) { - return IKS_FILTER_EAT; - } + + stasis_publish_mwi_state_full(item_id, context, newmsgs, oldmsgs, &pubsub_eid); + + return IKS_FILTER_EAT; } else { ast_debug(1, "Don't know how to handle PubSub event of type %s\n", iks_name(item_content)); @@ -1587,20 +1589,17 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) xmpp_pubsub_unsubscribe(client, "device_state"); xmpp_pubsub_unsubscribe(client, "message_waiting"); - if (!(client->mwi_sub = ast_event_subscribe(AST_EVENT_MWI, xmpp_pubsub_mwi_cb, "xmpp_pubsub_mwi_subscription", - client, AST_EVENT_IE_END))) { + if (!(client->mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { return; } if (ast_enable_distributed_devstate()) { return; } - if (!(client->device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, xmpp_pubsub_devstate_cb, "xmpp_pubsub_devstate_subscription", client, AST_EVENT_IE_END))) { - ast_event_unsubscribe(client->mwi_sub); - client->mwi_sub = NULL; + client->mwi_sub = stasis_unsubscribe(client->mwi_sub); return; } @@ -3524,8 +3523,7 @@ int ast_xmpp_client_disconnect(struct ast_xmpp_client *client) } if (client->mwi_sub) { - ast_event_unsubscribe(client->mwi_sub); - client->mwi_sub = NULL; + client->mwi_sub = stasis_unsubscribe(client->mwi_sub); xmpp_pubsub_unsubscribe(client, "message_waiting"); } |