diff options
author | Kinsey Moore <kmoore@digium.com> | 2013-04-16 15:33:59 +0000 |
---|---|---|
committer | Kinsey Moore <kmoore@digium.com> | 2013-04-16 15:33:59 +0000 |
commit | 191cf99ae1f821dec98199931fc775fc0716d27c (patch) | |
tree | 210b5ebfaf3f30fa06c666e8d99e93e9918253a5 | |
parent | c1ae5dc49be01a567b403306ee4b560391293f67 (diff) |
Move device state distribution to Stasis-core
In the move from Asterisk's event system to Stasis, this makes
distributed device state aggregation always-on, removes unnecessary
task processors where possible, and collapses aggregate and
non-aggregate states into a single cache for ease of retrieval. This
also removes an intermediary step in device state aggregation.
Review: https://reviewboard.asterisk.org/r/2389/
(closes issue ASTERISK-21101)
Patch-by: Kinsey Moore <kmoore@digium.com>
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@385860 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r-- | apps/app_queue.c | 82 | ||||
-rw-r--r-- | include/asterisk/devicestate.h | 93 | ||||
-rw-r--r-- | include/asterisk/xmpp.h | 3 | ||||
-rw-r--r-- | main/asterisk.c | 5 | ||||
-rw-r--r-- | main/ccss.c | 108 | ||||
-rw-r--r-- | main/devicestate.c | 417 | ||||
-rw-r--r-- | main/pbx.c | 63 | ||||
-rw-r--r-- | res/res_jabber.c | 64 | ||||
-rw-r--r-- | res/res_xmpp.c | 54 | ||||
-rw-r--r-- | tests/test_devicestate.c | 229 |
10 files changed, 691 insertions, 427 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c index fd692a4b2..9de87892c 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -990,9 +990,6 @@ static const struct autopause { { QUEUE_AUTOPAUSE_ALL,"all" }, }; - -static struct ast_taskprocessor *devicestate_tps; - #define DEFAULT_RETRY 5 #define DEFAULT_TIMEOUT 15 #define RECHECK 1 /*!< Recheck every second to see we we're at the top yet */ @@ -1037,8 +1034,8 @@ static int montype_default = 0; /*! \brief queues.conf [general] option */ static int shared_lastcall = 1; -/*! \brief Subscription to device state change events */ -static struct ast_event_sub *device_state_sub; +/*! \brief Subscription to device state change messages */ +static struct stasis_subscription *device_state_sub; /*! \brief queues.conf [general] option */ static int update_cdr = 0; @@ -1618,12 +1615,6 @@ static int get_member_status(struct call_queue *q, int max_penalty, int min_pena return -1; } -struct statechange { - AST_LIST_ENTRY(statechange) entry; - int state; - char dev[0]; -}; - /*! \brief set a member's status based on device state of that member's state_interface. * * Lock interface list find sc, iterate through each queues queue_member list for member to @@ -1742,10 +1733,10 @@ static int is_member_available(struct member *mem) } /*! \brief set a member's status based on device state of that member's interface*/ -static int handle_statechange(void *datap) +static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - struct statechange *sc = datap; struct ao2_iterator miter, qiter; + struct ast_device_state_message *dev_state; struct member *m; struct call_queue *q; char interface[80], *slash_pos; @@ -1753,6 +1744,16 @@ static int handle_statechange(void *datap) int found_member; /* Found this member in this queue */ int avail = 0; /* Found an available member in this queue */ + if (ast_device_state_message_type() != stasis_message_type(msg)) { + return; + } + + dev_state = stasis_message_data(msg); + if (dev_state->eid) { + /* ignore non-aggregate states */ + return; + } + qiter = ao2_iterator_init(queues, 0); while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) { ao2_lock(q); @@ -1770,9 +1771,9 @@ static int handle_statechange(void *datap) } } - if (!strcasecmp(interface, sc->dev)) { + if (!strcasecmp(interface, dev_state->device)) { found_member = 1; - update_status(q, m, sc->state); + update_status(q, m, dev_state->state); } } @@ -1804,39 +1805,18 @@ static int handle_statechange(void *datap) ao2_iterator_destroy(&qiter); if (found) { - ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", sc->dev, sc->state, ast_devstate2str(sc->state)); + ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", + dev_state->device, + dev_state->state, + ast_devstate2str(dev_state->state)); } else { - ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", sc->dev, sc->state, ast_devstate2str(sc->state)); + ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", + dev_state->device, + dev_state->state, + ast_devstate2str(dev_state->state)); } - ast_free(sc); - return 0; -} - -static void device_state_cb(const struct ast_event *event, void *unused) -{ - enum ast_device_state state; - const char *device; - struct statechange *sc; - size_t datapsize; - - state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE); - - if (ast_strlen_zero(device)) { - ast_log(LOG_ERROR, "Received invalid event that had no device IE\n"); - return; - } - datapsize = sizeof(*sc) + strlen(device) + 1; - if (!(sc = ast_calloc(1, datapsize))) { - ast_log(LOG_ERROR, "failed to calloc a state change struct\n"); - return; - } - sc->state = state; - strcpy(sc->dev, device); - if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) { - ast_free(sc); - } + return; } /*! \brief Helper function which converts from extension state to device state values */ @@ -9876,8 +9856,9 @@ static int unload_module(void) res |= ast_data_unregister(NULL); - if (device_state_sub) - ast_event_unsubscribe(device_state_sub); + if (device_state_sub) { + device_state_sub = stasis_unsubscribe(device_state_sub); + } ast_extension_state_del(0, extension_state_cb); @@ -9887,7 +9868,6 @@ static int unload_module(void) queue_t_unref(q, "Done with iterator"); } ao2_iterator_destroy(&q_iter); - devicestate_tps = ast_taskprocessor_unreference(devicestate_tps); ao2_ref(queues, -1); ast_unload_realtime("queue_members"); return res; @@ -9948,12 +9928,8 @@ static int load_module(void) res |= ast_custom_function_register(&queuewaitingcount_function); res |= ast_custom_function_register(&queuememberpenalty_function); - if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) { - ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n"); - } - /* in the following subscribe call, do I use DEVICE_STATE, or DEVICE_STATE_CHANGE? */ - if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, "AppQueue Device state", NULL, AST_EVENT_IE_END))) { + if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) { res = -1; } diff --git a/include/asterisk/devicestate.h b/include/asterisk/devicestate.h index 86740bc2f..2b3353ffd 100644 --- a/include/asterisk/devicestate.h +++ b/include/asterisk/devicestate.h @@ -38,6 +38,7 @@ #define _ASTERISK_DEVICESTATE_H #include "asterisk/channelstate.h" +#include "asterisk/utils.h" #if defined(__cplusplus) || defined(c_plusplus) extern "C" { @@ -270,19 +271,87 @@ struct ast_devstate_aggregate { }; /*! - * \brief Enable distributed device state processing. - * - * \details - * By default, Asterisk assumes that device state change events will only be - * originating from one instance. If a module gets loaded and configured such - * that multiple instances of Asterisk will be sharing device state, this - * function should be called to enable distributed device state processing. - * It is off by default to save on unnecessary processing. - * - * \retval 0 success - * \retval -1 failure + * \brief The structure that contains device state + * \since 12 + */ +struct ast_device_state_message { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(cache_id); /*!< A unique ID used for hashing */ + AST_STRING_FIELD(device); /*!< The name of the device */ + ); + enum ast_device_state state; /*!< The state of the device */ + struct ast_eid *eid; /*!< The EID of the server where this message originated, NULL EID means aggregate state */ + enum ast_devstate_cache cachable; /*!< Flag designating the cachability of this device state */ +}; + +/*! + * \brief Get the Stasis topic for device state messages + * \retval The topic for device state messages + * \retval NULL if it has not been allocated + * \since 12 + */ +struct stasis_topic *ast_device_state_topic_all(void); + +/*! + * \brief Get the Stasis topic for device state messages for a specific device + * \param uniqueid The device for which to get the topic + * \retval The topic structure for MWI messages for a given device + * \retval NULL if it failed to be found or allocated + * \since 12 + */ +struct stasis_topic *ast_device_state_topic(const char *device); + +/*! + * \brief Get the Stasis caching topic for device state messages + * \retval The caching topic for device state messages + * \retval NULL if it has not been allocated + * \since 12 + */ +struct stasis_caching_topic *ast_device_state_topic_cached(void); + +/*! + * \brief Get the Stasis message type for device state messages + * \retval The message type for device state messages + * \retval NULL if it has not been allocated + * \since 12 + */ +struct stasis_message_type *ast_device_state_message_type(void); + +/*! + * \brief Initialize the device state core + * \retval 0 Success + * \retval -1 Failure + * \since 12 + */ +int devstate_init(void); + +/*! + * \brief Publish a device state update + * \param[in] device The device name + * \param[in] state The state of the device + * \param[in] cachable Whether the device state can be cached + * \retval 0 Success + * \retval -1 Failure + * \since 12 + */ +#define ast_publish_device_state(device, state, cachable) \ + ast_publish_device_state_full(device, state, cachable, &ast_eid_default) + +/*! + * \brief Publish a device state update with EID + * \param[in] device The device name + * \param[in] state The state of the device + * \param[in] cachable Whether the device state can be cached + * \param[in] eid The EID of the server that originally published the message + * \retval 0 Success + * \retval -1 Failure + * \since 12 */ -int ast_enable_distributed_devstate(void); +int ast_publish_device_state_full( + const char *device, + enum ast_device_state state, + enum ast_devstate_cache cachable, + struct ast_eid *eid); #if defined(__cplusplus) || defined(c_plusplus) } diff --git a/include/asterisk/xmpp.h b/include/asterisk/xmpp.h index 6833b6cce..6d14b1115 100644 --- a/include/asterisk/xmpp.h +++ b/include/asterisk/xmpp.h @@ -47,6 +47,7 @@ #include "asterisk/linkedlists.h" #include "asterisk/stringfields.h" #include "asterisk/pbx.h" +#include "asterisk/stasis.h" /* * As per RFC 3920 - section 3.1, the maximum length for a full Jabber ID @@ -135,7 +136,7 @@ struct ast_xmpp_client { int timeout; unsigned int reconnect:1; /*!< Reconnect this client */ struct stasis_subscription *mwi_sub; /*!< If distributing event information the MWI subscription */ - struct ast_event_sub *device_state_sub; /*!< If distributing event information the device state subscription */ + struct stasis_subscription *device_state_sub; /*!< If distributing event information the device state subscription */ }; /*! diff --git a/main/asterisk.c b/main/asterisk.c index d7eb8d953..f9ff163e6 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -4180,6 +4180,11 @@ int main(int argc, char *argv[]) aco_init(); + if (devstate_init()) { + printf("Device state core initialization failed.\n%s", term_quit()); + exit(1); + } + if (app_init()) { printf("App core initialization failed.\n%s", term_quit()); exit(1); diff --git a/main/ccss.c b/main/ccss.c index b479a3c54..6b59a2f37 100644 --- a/main/ccss.c +++ b/main/ccss.c @@ -1209,7 +1209,7 @@ struct generic_monitor_instance_list { * recalled */ int fit_for_recall; - struct ast_event_sub *sub; + struct stasis_subscription *sub; AST_LIST_HEAD_NOLOCK(, generic_monitor_instance) list; }; @@ -1260,19 +1260,20 @@ static void generic_monitor_instance_list_destructor(void *obj) struct generic_monitor_instance_list *generic_list = obj; struct generic_monitor_instance *generic_instance; - generic_list->sub = ast_event_unsubscribe(generic_list->sub); + generic_list->sub = stasis_unsubscribe(generic_list->sub); while ((generic_instance = AST_LIST_REMOVE_HEAD(&generic_list->list, next))) { ast_free(generic_instance); } ast_free((char *)generic_list->device_name); } -static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata); +static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor) { struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list), generic_monitor_instance_list_destructor, "allocate generic monitor instance list"); char * device_name; + struct stasis_topic *device_specific_topic; if (!generic_list) { return NULL; @@ -1285,11 +1286,12 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_ ast_tech_to_upper(device_name); generic_list->device_name = device_name; - if (!(generic_list->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, - generic_monitor_devstate_cb, "Requesting CC", NULL, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, monitor->interface->device_name, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_END))) { + device_specific_topic = ast_device_state_topic(device_name); + if (!device_specific_topic) { + return NULL; + } + + if (!(generic_list->sub = stasis_subscribe(device_specific_topic, generic_monitor_devstate_cb, NULL))) { cc_unref(generic_list, "Failed to subscribe to device state"); return NULL; } @@ -1298,35 +1300,25 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_ return generic_list; } -struct generic_tp_cb_data { - const char *device_name; - enum ast_device_state new_state; -}; - static int generic_monitor_devstate_tp_cb(void *data) { - struct generic_tp_cb_data *gtcd = data; - enum ast_device_state new_state = gtcd->new_state; - enum ast_device_state previous_state = gtcd->new_state; - const char *monitor_name = gtcd->device_name; + RAII_VAR(struct ast_device_state_message *, dev_state, data, ao2_cleanup); + enum ast_device_state new_state = dev_state->state; + enum ast_device_state previous_state; struct generic_monitor_instance_list *generic_list; struct generic_monitor_instance *generic_instance; - if (!(generic_list = find_generic_monitor_instance_list(monitor_name))) { + if (!(generic_list = find_generic_monitor_instance_list(dev_state->device))) { /* The most likely cause for this is that we destroyed the monitor in the * time between subscribing to its device state and the time this executes. * Not really a big deal. */ - ast_free((char *) gtcd->device_name); - ast_free(gtcd); return 0; } if (generic_list->current_state == new_state) { /* The device state hasn't actually changed, so we don't really care */ cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback"); - ast_free((char *) gtcd->device_name); - ast_free(gtcd); return 0; } @@ -1346,33 +1338,31 @@ static int generic_monitor_devstate_tp_cb(void *data) } } cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback"); - ast_free((char *) gtcd->device_name); - ast_free(gtcd); return 0; } -static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata) +static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { /* Wow, it's cool that we've picked up on a state change, but we really want * the actual work to be done in the core's taskprocessor execution thread * so that all monitor operations can be serialized. Locks?! We don't need * no steenkin' locks! */ - struct generic_tp_cb_data *gtcd = ast_calloc(1, sizeof(*gtcd)); - - if (!gtcd) { + struct ast_device_state_message *dev_state; + if (ast_device_state_message_type() != stasis_message_type(msg)) { return; } - if (!(gtcd->device_name = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE)))) { - ast_free(gtcd); + dev_state = stasis_message_data(msg); + if (dev_state->eid) { + /* ignore non-aggregate states */ return; } - gtcd->new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, gtcd)) { - ast_free((char *)gtcd->device_name); - ast_free(gtcd); + ao2_t_ref(dev_state, +1, "Bumping dev_state ref for cc_core_taskprocessor"); + if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, dev_state)) { + ao2_cleanup(dev_state); + return; } } @@ -2502,7 +2492,7 @@ struct cc_generic_agent_pvt { * device state of the caller in order to * determine when we may move on */ - struct ast_event_sub *sub; + struct stasis_subscription *sub; /*! * Scheduler id of offer timer. */ @@ -2635,34 +2625,33 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent) return 0; } -static int generic_agent_devstate_unsubscribe(void *data) +static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - struct ast_cc_agent *agent = data; + struct ast_cc_agent *agent = userdata; + enum ast_device_state new_state; + struct ast_device_state_message *dev_state; struct cc_generic_agent_pvt *generic_pvt = agent->private_data; - if (generic_pvt->sub != NULL) { - generic_pvt->sub = ast_event_unsubscribe(generic_pvt->sub); + if (stasis_subscription_final_message(sub, msg)) { + cc_unref(agent, "Done holding ref for subscription"); + return; + } else if (ast_device_state_message_type() != stasis_message_type(msg)) { + return; } - cc_unref(agent, "Done unsubscribing from devstate"); - return 0; -} -static void generic_agent_devstate_cb(const struct ast_event *event, void *userdata) -{ - struct ast_cc_agent *agent = userdata; - enum ast_device_state new_state; + dev_state = stasis_message_data(msg); + if (dev_state->eid) { + /* ignore non-aggregate states */ + return; + } - new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); + new_state = dev_state->state; if (!cc_generic_is_device_available(new_state)) { /* Not interested in this new state of the device. It is still busy. */ return; } - /* We can't unsubscribe from device state events here because it causes a deadlock */ - if (ast_taskprocessor_push(cc_core_taskprocessor, generic_agent_devstate_unsubscribe, - cc_ref(agent, "ref agent for device state unsubscription"))) { - cc_unref(agent, "Unref agent unsubscribing from devstate failed"); - } + generic_pvt->sub = stasis_unsubscribe(sub); ast_cc_agent_caller_available(agent->core_id, "%s is no longer busy", agent->device_name); } @@ -2670,18 +2659,21 @@ static int cc_generic_agent_start_monitoring(struct ast_cc_agent *agent) { struct cc_generic_agent_pvt *generic_pvt = agent->private_data; struct ast_str *str = ast_str_alloca(128); + struct stasis_topic *device_specific_topic; ast_assert(generic_pvt->sub == NULL); ast_str_set(&str, 0, "Agent monitoring %s device state since it is busy\n", agent->device_name); - if (!(generic_pvt->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, - generic_agent_devstate_cb, ast_str_buffer(str), agent, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, agent->device_name, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_END))) { + device_specific_topic = ast_device_state_topic(agent->device_name); + if (!device_specific_topic) { + return -1; + } + + if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) { return -1; } + cc_ref(agent, "Ref agent for subscription"); return 0; } @@ -2792,7 +2784,7 @@ static void cc_generic_agent_destructor(struct ast_cc_agent *agent) cc_generic_agent_stop_offer_timer(agent); if (agent_pvt->sub) { - agent_pvt->sub = ast_event_unsubscribe(agent_pvt->sub); + agent_pvt->sub = stasis_unsubscribe(agent_pvt->sub); } ast_free(agent_pvt); diff --git a/main/devicestate.c b/main/devicestate.c index 4ed51f9e3..40b9e55e0 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -129,7 +129,12 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/devicestate.h" #include "asterisk/pbx.h" #include "asterisk/app.h" +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" #include "asterisk/event.h" +#include "asterisk/devicestate.h" + +#define DEVSTATE_TOPIC_BUCKETS 57 /*! \brief Device state strings for printing */ static const char * const devstatestring[][2] = { @@ -188,25 +193,12 @@ static pthread_t change_thread = AST_PTHREADT_NULL; /*! \brief Flag for the queue */ static ast_cond_t change_pending; -struct devstate_change { - AST_LIST_ENTRY(devstate_change) entry; - uint32_t state; - struct ast_eid eid; - enum ast_devstate_cache cachable; - char device[1]; -}; +struct stasis_subscription *devstate_message_sub; -static struct { - pthread_t thread; - struct ast_event_sub *event_sub; - ast_cond_t cond; - ast_mutex_t lock; - AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q; - unsigned int enabled:1; -} devstate_collector = { - .thread = AST_PTHREADT_NULL, - .enabled = 0, -}; +static struct stasis_topic *device_state_topic_all; +static struct stasis_caching_topic *device_state_topic_cached; +static struct stasis_message_type *device_state_message_type; +static struct stasis_topic_pool *device_state_topic_pool; /* Forward declarations */ static int getproviderstate(const char *provider, const char *address); @@ -289,21 +281,16 @@ enum ast_device_state ast_parse_device_state(const char *device) static enum ast_device_state devstate_cached(const char *device) { - enum ast_device_state res = AST_DEVICE_UNKNOWN; - struct ast_event *event; - - event = ast_event_get_cached(AST_EVENT_DEVICE_STATE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_END); - - if (!event) - return res; + RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup); + struct ast_device_state_message *device_state; - res = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - - ast_event_destroy(event); + cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + if (!cached_msg) { + return AST_DEVICE_UNKNOWN; + } + device_state = stasis_message_data(cached_msg); - return res; + return device_state->state; } /*! \brief Check device state through channel specific function or generic function */ @@ -426,39 +413,9 @@ static int getproviderstate(const char *provider, const char *address) return res; } -static void devstate_event(const char *device, enum ast_device_state state, int cachable) -{ - struct ast_event *event; - enum ast_event_type event_type; - - if (devstate_collector.enabled) { - /* Distributed device state is enabled, so this state change is a change - * for a single server, not the real state. */ - event_type = AST_EVENT_DEVICE_STATE_CHANGE; - } else { - event_type = AST_EVENT_DEVICE_STATE; - } - - ast_debug(3, "device '%s' state '%d'\n", device, state); - - if (!(event = ast_event_new(event_type, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state, - AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, cachable, - AST_EVENT_IE_END))) { - return; - } - - if (cachable) { - ast_event_queue_and_cache(event); - } else { - ast_event_queue(event); - } -} - /*! Called by the state change thread to find out what the state is, and then * to queue up the state change event */ -static void do_state_change(const char *device, int cachable) +static void do_state_change(const char *device, enum ast_devstate_cache cachable) { enum ast_device_state state; @@ -466,7 +423,7 @@ static void do_state_change(const char *device, int cachable) ast_debug(3, "Changing state for %s - state %d (%s)\n", device, state, ast_devstate2str(state)); - devstate_event(device, state, cachable); + ast_publish_device_state(device, state, cachable); } int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_cache cachable, const char *device) @@ -490,7 +447,7 @@ int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_ */ if (state != AST_DEVICE_UNKNOWN) { - devstate_event(device, state, cachable); + ast_publish_device_state(device, state, cachable); } else if (change_thread == AST_PTHREADT_NULL || !(change = ast_calloc(1, sizeof(*change) + strlen(device)))) { /* we could not allocate a change struct, or */ /* there is no background thread, so process the change now */ @@ -562,176 +519,148 @@ static void *do_devstate_changes(void *data) return NULL; } -static void destroy_devstate_change(struct devstate_change *sc) -{ - ast_free(sc); -} - #define MAX_SERVERS 64 -struct change_collection { - struct devstate_change states[MAX_SERVERS]; - size_t num_states; -}; - -static void devstate_cache_cb(const struct ast_event *event, void *data) +static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags) { - struct change_collection *collection = data; - int i; - const struct ast_eid *eid; + struct stasis_message *msg = obj; + struct ast_devstate_aggregate *aggregate = arg; + char *device = data; + struct ast_device_state_message *device_state = stasis_message_data(msg); - if (collection->num_states == ARRAY_LEN(collection->states)) { - ast_log(LOG_ERROR, "More per-server state values than we have room for (MAX_SERVERS is %d)\n", - MAX_SERVERS); - return; - } - - if (!(eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { - ast_log(LOG_ERROR, "Device state change event with no EID\n"); - return; + if (!device_state->eid || strcmp(device, device_state->device)) { + /* ignore aggregate states and devices that don't match */ + return 0; } - - i = collection->num_states; - - collection->states[i].state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - collection->states[i].eid = *eid; - - collection->num_states++; + ast_debug(1, "Adding per-server state of '%s' for '%s'\n", + ast_devstate2str(device_state->state), device); + ast_devstate_aggregate_add(aggregate, device_state->state); + return 0; } -static void process_collection(const char *device, enum ast_devstate_cache cachable, struct change_collection *collection) +static void device_state_dtor(void *obj) { - int i; - struct ast_devstate_aggregate agg; - enum ast_device_state state; - struct ast_event *event; + struct ast_device_state_message *device_state = obj; + ast_string_field_free_memory(device_state); + ast_free(device_state->eid); +} - ast_devstate_aggregate_init(&agg); +static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid) +{ + RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup); - for (i = 0; i < collection->num_states; i++) { - ast_debug(1, "Adding per-server state of '%s' for '%s'\n", - ast_devstate2str(collection->states[i].state), device); - ast_devstate_aggregate_add(&agg, collection->states[i].state); + if (!new_device_state || ast_string_field_init(new_device_state, 256)) { + return NULL; } - state = ast_devstate_aggregate_result(&agg); + ast_string_field_set(new_device_state, device, device); + new_device_state->state = state; + new_device_state->cachable = cachable; - ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n", - ast_devstate2str(state), device); + if (eid) { + char eid_str[20]; + struct ast_str *cache_id = ast_str_alloca(256); - event = ast_event_get_cached(AST_EVENT_DEVICE_STATE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_END); - - if (event) { - enum ast_device_state old_state; + new_device_state->eid = ast_malloc(sizeof(*eid)); + if (!new_device_state->eid) { + return NULL; + } - old_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); + *new_device_state->eid = *eid; + ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid); + ast_str_set(&cache_id, 0, "%s%s", eid_str, device); + ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id)); + } else { + /* no EID makes this an aggregate state */ + ast_string_field_set(new_device_state, cache_id, device); + } - ast_event_destroy(event); + ao2_ref(new_device_state, +1); + return new_device_state; +} - if (state == old_state) { - /* No change since last reported device state */ - ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n", - device, ast_devstate2str(state)); - return; - } - } +static enum ast_device_state get_aggregate_state(char *device) +{ + RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); + struct ast_devstate_aggregate aggregate; - ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n", - device, ast_devstate2str(state)); + ast_devstate_aggregate_init(&aggregate); - event = ast_event_new(AST_EVENT_DEVICE_STATE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state, - AST_EVENT_IE_END); + cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); - if (!event) { - return; - } + ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device); - if (cachable) { - ast_event_queue_and_cache(event); - } else { - ast_event_queue(event); - } + return ast_devstate_aggregate_result(&aggregate); } -static void handle_devstate_change(struct devstate_change *sc) +static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state) { - struct ast_event_sub *tmp_sub; - struct change_collection collection = { - .num_states = 0, - }; - - ast_debug(1, "Processing device state change for '%s'\n", sc->device); + RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup); + struct ast_device_state_message *cached_aggregate_device_state; - if (!(tmp_sub = ast_event_subscribe_new(AST_EVENT_DEVICE_STATE_CHANGE, devstate_cache_cb, &collection))) { - ast_log(LOG_ERROR, "Failed to create subscription\n"); - return; + cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + if (!cached_aggregate_msg) { + return 1; } - if (ast_event_sub_append_ie_str(tmp_sub, AST_EVENT_IE_DEVICE, sc->device)) { - ast_log(LOG_ERROR, "Failed to append device IE\n"); - ast_event_sub_destroy(tmp_sub); - return; + cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg); + if (cached_aggregate_device_state->state == new_aggregate_state) { + return 0; } - - /* Populate the collection of device states from the cache */ - ast_event_dump_cache(tmp_sub); - - process_collection(sc->device, sc->cachable, &collection); - - ast_event_sub_destroy(tmp_sub); + return 1; } -static void *run_devstate_collector(void *data) +static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - for (;;) { - struct devstate_change *sc; + enum ast_device_state aggregate_state; + char *device; + struct ast_device_state_message *device_state; + RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup); + RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup); + + if (stasis_cache_update_type() == stasis_message_type(msg)) { + struct stasis_cache_update *update = stasis_message_data(msg); + if (!update->new_snapshot) { + return; + } + msg = update->new_snapshot; + } - ast_mutex_lock(&devstate_collector.lock); - while (!(sc = AST_LIST_REMOVE_HEAD(&devstate_collector.devstate_change_q, entry))) - ast_cond_wait(&devstate_collector.cond, &devstate_collector.lock); - ast_mutex_unlock(&devstate_collector.lock); + if (ast_device_state_message_type() != stasis_message_type(msg)) { + return; + } - handle_devstate_change(sc); + device_state = stasis_message_data(msg); - destroy_devstate_change(sc); + if (!device_state->eid) { + /* ignore aggregate messages */ + return; } - return NULL; -} + device = ast_strdupa(device_state->device); + ast_debug(1, "Processing device state change for '%s'\n", device); -static void devstate_change_collector_cb(const struct ast_event *event, void *data) -{ - struct devstate_change *sc; - const char *device; - const struct ast_eid *eid; - uint32_t state; - enum ast_devstate_cache cachable = AST_DEVSTATE_CACHABLE; + if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) { + /* if it's not cachable, there will be no aggregate state to get + * and this should be passed through */ + aggregate_state = device_state->state; + } else { - device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE); - eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); - state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE); + aggregate_state = get_aggregate_state(device); + ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n", + ast_devstate2str(aggregate_state), device); - if (ast_strlen_zero(device) || !eid) { - ast_log(LOG_ERROR, "Invalid device state change event received\n"); - return; + if (!aggregate_state_changed(device, aggregate_state)) { + /* No change since last reported device state */ + ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n", + device, ast_devstate2str(aggregate_state)); + return; + } } - if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device)))) - return; - - strcpy(sc->device, device); - sc->eid = *eid; - sc->state = state; - sc->cachable = cachable; + ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n", + device, ast_devstate2str(aggregate_state)); - ast_mutex_lock(&devstate_collector.lock); - AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry); - ast_cond_signal(&devstate_collector.cond); - ast_mutex_unlock(&devstate_collector.lock); + ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL); } /*! \brief Initialize the device state engine in separate thread */ @@ -784,28 +713,106 @@ enum ast_device_state ast_devstate_aggregate_result(struct ast_devstate_aggregat return agg->state; } -int ast_enable_distributed_devstate(void) +struct stasis_topic *ast_device_state_topic_all(void) { - if (devstate_collector.enabled) { - return 0; + return device_state_topic_all; +} + +struct stasis_caching_topic *ast_device_state_topic_cached(void) +{ + return device_state_topic_cached; +} + +struct stasis_message_type *ast_device_state_message_type(void) +{ + return device_state_message_type; +} + +struct stasis_topic *ast_device_state_topic(const char *device) +{ + return stasis_topic_pool_get_topic(device_state_topic_pool, device); +} + +int ast_publish_device_state_full( + const char *device, + enum ast_device_state state, + enum ast_devstate_cache cachable, + struct ast_eid *eid) +{ + RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + struct stasis_topic *device_specific_topic; + + ast_assert(!ast_strlen_zero(device)); + + device_state = device_state_alloc(device, state, cachable, eid); + if (!device_state) { + return -1; } - devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, - devstate_change_collector_cb, "devicestate_engine_enable_distributed", NULL, AST_EVENT_IE_END); + message = stasis_message_create(ast_device_state_message_type(), device_state); - if (!devstate_collector.event_sub) { - ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); + device_specific_topic = ast_device_state_topic(device); + if (!device_specific_topic) { return -1; } - ast_mutex_init(&devstate_collector.lock); - ast_cond_init(&devstate_collector.cond, NULL); - if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) { - ast_log(LOG_ERROR, "Unable to start device state collector thread.\n"); + stasis_publish(device_specific_topic, message); + return 0; +} + +static const char *device_state_get_id(struct stasis_message *message) +{ + struct ast_device_state_message *device_state; + if (ast_device_state_message_type() != stasis_message_type(message)) { + return NULL; + } + + device_state = stasis_message_data(message); + if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) { + return NULL; + } + + return device_state->cache_id; +} + +static void devstate_exit(void) +{ + ao2_cleanup(device_state_topic_all); + device_state_topic_all = NULL; + device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached); + ao2_cleanup(device_state_message_type); + device_state_message_type = NULL; + ao2_cleanup(device_state_topic_pool); + device_state_topic_pool = NULL; +} + +int devstate_init(void) +{ + device_state_topic_all = stasis_topic_create("ast_device_state_topic"); + if (!device_state_topic_all) { + return -1; + } + device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id); + if (!device_state_topic_cached) { + return -1; + } + device_state_message_type = stasis_message_type_create("ast_device_state_message"); + if (!device_state_message_type) { + return -1; + } + device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all()); + if (!device_state_topic_pool) { return -1; } - devstate_collector.enabled = 1; + devstate_message_sub = stasis_subscribe(stasis_caching_get_topic(ast_device_state_topic_cached()), devstate_change_collector_cb, NULL); + + if (!devstate_message_sub) { + ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); + return -1; + } + ast_register_atexit(devstate_exit); return 0; } diff --git a/main/pbx.c b/main/pbx.c index 8bc0c3f5f..017896979 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -1130,11 +1130,6 @@ struct presencechange { char *message; }; -struct statechange { - AST_LIST_ENTRY(statechange) entry; - char dev[0]; -}; - struct pbx_exception { AST_DECLARE_STRING_FIELDS( AST_STRING_FIELD(context); /*!< Context associated with this exception */ @@ -1300,7 +1295,7 @@ static int extenpatternmatchnew = 0; static char *overrideswitch = NULL; /*! \brief Subscription for device state change events */ -static struct ast_event_sub *device_state_sub; +static struct stasis_subscription *device_state_sub; /*! \brief Subscription for presence state change events */ static struct ast_event_sub *presence_state_sub; @@ -5247,32 +5242,40 @@ static void get_device_state_causing_channels(struct ao2_container *c) ao2_iterator_destroy(&iter); } -static int handle_statechange(void *datap) +static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { + struct ast_device_state_message *dev_state; struct ast_hint *hint; struct ast_str *hint_app; struct ast_hintdevice *device; struct ast_hintdevice *cmpdevice; - struct statechange *sc = datap; struct ao2_iterator *dev_iter; struct ao2_iterator cb_iter; char context_name[AST_MAX_CONTEXT]; char exten_name[AST_MAX_EXTENSION]; + if (ast_device_state_message_type() != stasis_message_type(msg)) { + return; + } + + dev_state = stasis_message_data(msg); + if (dev_state->eid) { + /* ignore non-aggregate states */ + return; + } + if (ao2_container_count(hintdevices) == 0) { /* There are no hints monitoring devices. */ - ast_free(sc); - return 0; + return; } hint_app = ast_str_create(1024); if (!hint_app) { - ast_free(sc); - return -1; + return; } - cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(sc->dev)); - strcpy(cmpdevice->hintdevice, sc->dev); + cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(dev_state->device)); + strcpy(cmpdevice->hintdevice, dev_state->device); ast_mutex_lock(&context_merge_lock);/* Hold off ast_merge_contexts_and_delete */ dev_iter = ao2_t_callback(hintdevices, @@ -5283,8 +5286,7 @@ static int handle_statechange(void *datap) if (!dev_iter) { ast_mutex_unlock(&context_merge_lock); ast_free(hint_app); - ast_free(sc); - return -1; + return; } for (; (device = ao2_iterator_next(dev_iter)); ao2_t_ref(device, -1, "Next device")) { @@ -5381,8 +5383,7 @@ static int handle_statechange(void *datap) ao2_iterator_destroy(dev_iter); ast_free(hint_app); - ast_free(sc); - return 0; + return; } /*! @@ -8846,7 +8847,7 @@ void ast_merge_contexts_and_delete(struct ast_context **extcontexts, struct ast_ /* * Notify watchers of all removed hints with the same lock - * environment as handle_statechange(). + * environment as device_state_cb(). */ while ((saved_hint = AST_LIST_REMOVE_HEAD(&hints_removed, list))) { /* this hint has been removed, notify the watchers */ @@ -11707,25 +11708,6 @@ static void presence_state_cb(const struct ast_event *event, void *unused) } } -static void device_state_cb(const struct ast_event *event, void *unused) -{ - const char *device; - struct statechange *sc; - - device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE); - if (ast_strlen_zero(device)) { - ast_log(LOG_ERROR, "Received invalid event that had no device IE\n"); - return; - } - - if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1))) - return; - strcpy(sc->dev, device); - if (ast_taskprocessor_push(extension_state_tps, handle_statechange, sc) < 0) { - ast_free(sc); - } -} - /*! * \internal * \brief Implements the hints data provider. @@ -11786,7 +11768,7 @@ static void unload_pbx(void) presence_state_sub = ast_event_unsubscribe(presence_state_sub); } if (device_state_sub) { - device_state_sub = ast_event_unsubscribe(device_state_sub); + device_state_sub = stasis_unsubscribe(device_state_sub); } /* Unregister builtin applications */ @@ -11833,8 +11815,7 @@ int load_pbx(void) /* Register manager application */ ast_manager_register_xml_core("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan); - if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, "pbx Device State Change", NULL, - AST_EVENT_IE_END))) { + if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) { return -1; } diff --git a/res/res_jabber.c b/res/res_jabber.c index 0ee64f969..04bc614e9 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -372,7 +372,7 @@ static void aji_pubsub_purge_nodes(struct aji_client *client, const char* collection_name); static void aji_publish_mwi(struct aji_client *client, const char *mailbox, const char *context, const char *oldmsgs, const char *newmsgs); -static void aji_devstate_cb(const struct ast_event *ast_event, void *data); +static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node, const char *event_type, unsigned int cachable); @@ -411,7 +411,7 @@ static char *app_ajileave = "JabberLeave"; static struct aji_client_container clients; static struct aji_capabilities *capabilities = NULL; static struct stasis_subscription *mwi_sub = NULL; -static struct ast_event_sub *device_state_sub = NULL; +static struct stasis_subscription *device_state_sub = NULL; static ast_cond_t message_received_condition; static ast_mutex_t messagelock; @@ -3274,25 +3274,30 @@ static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasi * \param data void pointer to ast_client structure * \return void */ -static void aji_devstate_cb(const struct ast_event *ast_event, void *data) +static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - const char *device; - const char *device_state; - unsigned int cachable; - struct aji_client *client; - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) - { - /* If the event didn't originate from this server, don't send it back out. */ - ast_debug(1, "Returning here\n"); + struct aji_client *client = data; + struct ast_device_state_message *dev_state; + + if (!stasis_subscription_is_subscribed(sub) || ast_device_state_message_type() != stasis_message_type(msg)) { return; } - client = ASTOBJ_REF((struct aji_client *) data); - device = ast_event_get_ie_str(ast_event, AST_EVENT_IE_DEVICE); - device_state = ast_devstate_str(ast_event_get_ie_uint(ast_event, AST_EVENT_IE_STATE)); - cachable = ast_event_get_ie_uint(ast_event, AST_EVENT_IE_CACHABLE); - aji_publish_device_state(client, device, device_state, cachable); - ASTOBJ_UNREF(client, ast_aji_client_destroy); + dev_state = stasis_message_data(msg); + if (!dev_state->eid || ast_eid_cmp(&ast_eid_default, dev_state->eid)) { + /* If the event is aggregate or didn't originate from this server, don't send it out. */ + return; + } + + aji_publish_device_state(client, dev_state->device, ast_devstate_str(dev_state->state), dev_state->cachable); +} + +static int cached_devstate_cb(void *obj, void *arg, int flags) +{ + struct stasis_message *msg = obj; + struct aji_client *client = arg; + aji_devstate_cb(client, device_state_sub, NULL, msg); + return 0; } /*! @@ -3306,12 +3311,11 @@ static void aji_init_event_distribution(struct aji_client *client) mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), aji_mwi_cb, client); } if (!device_state_sub) { - if (ast_enable_distributed_devstate()) { - return; - } - device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, - aji_devstate_cb, "aji_devstate_subscription", client, AST_EVENT_IE_END); - ast_event_dump_cache(device_state_sub); + RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); + device_state_sub = stasis_subscribe(ast_device_state_topic_all(), + aji_devstate_cb, client); + cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client); } aji_pubsub_subscribe(client, "device_state"); @@ -3355,13 +3359,11 @@ static int aji_handle_pubsub_event(void *data, ikspak *pak) if ((cachable_str = iks_find_cdata(item, "cachable"))) { sscanf(cachable_str, "%30d", &cachable); } - if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_STATE, - AST_EVENT_IE_PLTYPE_UINT, ast_devstate_val(device_state), AST_EVENT_IE_EID, - AST_EVENT_IE_PLTYPE_RAW, &pubsub_eid, sizeof(pubsub_eid), - AST_EVENT_IE_END))) { - return IKS_FILTER_EAT; - } + ast_publish_device_state_full(item_id, + ast_devstate_val(device_state), + cachable == AST_DEVSTATE_CACHABLE ? AST_DEVSTATE_CACHABLE : AST_DEVSTATE_NOT_CACHABLE, + &pubsub_eid); + return IKS_FILTER_EAT; } else if (!strcasecmp(iks_name(item_content), "mailbox")) { context = strsep(&item_id, "@"); sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs); @@ -4772,7 +4774,7 @@ static int unload_module(void) mwi_sub = stasis_unsubscribe(mwi_sub); } if (device_state_sub) { - ast_event_unsubscribe(device_state_sub); + device_state_sub = stasis_unsubscribe(device_state_sub); } ast_custom_function_unregister(&jabberreceive_function); diff --git a/res/res_xmpp.c b/res/res_xmpp.c index a53a8fd91..f9cfdac11 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1352,22 +1352,22 @@ static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, stru * \param data void pointer to ast_client structure * \return void */ -static void xmpp_pubsub_devstate_cb(const struct ast_event *ast_event, void *data) +static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { struct ast_xmpp_client *client = data; - const char *device, *device_state; - unsigned int cachable; + struct ast_device_state_message *dev_state; - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) { - /* If the event didn't originate from this server, don't send it back out. */ - ast_debug(1, "Returning here\n"); + if (!stasis_subscription_is_subscribed(sub) || ast_device_state_message_type() != stasis_message_type(msg)) { + return; + } + + dev_state = stasis_message_data(msg); + if (!dev_state->eid || ast_eid_cmp(&ast_eid_default, dev_state->eid)) { + /* If the event is aggregate or didn't originate from this server, don't send it out. */ return; } - device = ast_event_get_ie_str(ast_event, AST_EVENT_IE_DEVICE); - device_state = ast_devstate_str(ast_event_get_ie_uint(ast_event, AST_EVENT_IE_STATE)); - cachable = ast_event_get_ie_uint(ast_event, AST_EVENT_IE_CACHABLE); - xmpp_pubsub_publish_device_state(client, device, device_state, cachable); + xmpp_pubsub_publish_device_state(client, dev_state->device, ast_devstate_str(dev_state->state), dev_state->cachable); } /*! @@ -1474,13 +1474,11 @@ static int xmpp_pubsub_handle_event(void *data, ikspak *pak) if ((cachable_str = iks_find_cdata(item, "cachable"))) { sscanf(cachable_str, "%30d", &cachable); } - if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_STATE, - AST_EVENT_IE_PLTYPE_UINT, ast_devstate_val(device_state), AST_EVENT_IE_EID, - AST_EVENT_IE_PLTYPE_RAW, &pubsub_eid, sizeof(pubsub_eid), - AST_EVENT_IE_END))) { - return IKS_FILTER_EAT; - } + ast_publish_device_state_full(item_id, + ast_devstate_val(device_state), + cachable == AST_DEVSTATE_CACHABLE ? AST_DEVSTATE_CACHABLE : AST_DEVSTATE_NOT_CACHABLE, + &pubsub_eid); + return IKS_FILTER_EAT; } else if (!strcasecmp(iks_name(item_content), "mailbox")) { context = strsep(&item_id, "@"); sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs); @@ -1572,6 +1570,14 @@ static int xmpp_pubsub_handle_error(void *data, ikspak *pak) return IKS_FILTER_EAT; } +static int cached_devstate_cb(void *obj, void *arg, int flags) +{ + struct stasis_message *msg = obj; + struct ast_xmpp_client *client = arg; + xmpp_pubsub_devstate_cb(client, client->device_state_sub, NULL, msg); + return 0; +} + /*! * \brief Initialize collections for event distribution * \param client the configured XMPP client we use to connect to a XMPP server @@ -1581,6 +1587,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) { RAII_VAR(struct xmpp_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup); RAII_VAR(struct ast_xmpp_client_config *, clientcfg, NULL, ao2_cleanup); + RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); if (!cfg || !cfg->clients || !(clientcfg = xmpp_config_find(cfg->clients, client->name))) { return; @@ -1593,17 +1600,13 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) return; } - if (ast_enable_distributed_devstate()) { - return; - } - - if (!(client->device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, - xmpp_pubsub_devstate_cb, "xmpp_pubsub_devstate_subscription", client, AST_EVENT_IE_END))) { + if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) { client->mwi_sub = stasis_unsubscribe(client->mwi_sub); return; } - ast_event_dump_cache(client->device_state_sub); + cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client); xmpp_pubsub_subscribe(client, "device_state"); xmpp_pubsub_subscribe(client, "message_waiting"); @@ -3528,8 +3531,7 @@ int ast_xmpp_client_disconnect(struct ast_xmpp_client *client) } if (client->device_state_sub) { - ast_event_unsubscribe(client->device_state_sub); - client->device_state_sub = NULL; + client->device_state_sub = stasis_unsubscribe(client->device_state_sub); xmpp_pubsub_unsubscribe(client, "device_state"); } diff --git a/tests/test_devicestate.c b/tests/test_devicestate.c index f5f209b9e..233b6e70a 100644 --- a/tests/test_devicestate.c +++ b/tests/test_devicestate.c @@ -39,7 +39,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/test.h" #include "asterisk/devicestate.h" #include "asterisk/pbx.h" +#include "asterisk/stasis_message_router.h" +#define UNIT_TEST_DEVICE_IDENTIFIER "unit_test_device_identifier" /* These arrays are the result of the 'core show device2extenstate' output. */ static int combined_results[] = { @@ -274,14 +276,241 @@ AST_TEST_DEFINE(device2extenstate_test) return res; } +struct consumer { + ast_mutex_t lock; + ast_cond_t out; + int already_out; + enum ast_device_state state; + enum ast_device_state aggregate_state; + int sig_on_non_aggregate_state; +}; + +static void consumer_dtor(void *obj) { + struct consumer *consumer = obj; + + ast_mutex_destroy(&consumer->lock); + ast_cond_destroy(&consumer->out); +} + +static struct consumer *consumer_create(void) { + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + + consumer = ao2_alloc(sizeof(*consumer), consumer_dtor); + + if (!consumer) { + return NULL; + } + + ast_mutex_init(&consumer->lock); + ast_cond_init(&consumer->out, NULL); + consumer->sig_on_non_aggregate_state = 0; + + ao2_ref(consumer, +1); + return consumer; +} + +static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + struct consumer *consumer = data; + RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); + struct stasis_cache_update *cache_update = stasis_message_data(message); + struct ast_device_state_message *device_state; + SCOPED_MUTEX(lock, &consumer->lock); + + if (!cache_update->new_snapshot) { + return; + } + + device_state = stasis_message_data(cache_update->new_snapshot); + + if (strcmp(device_state->device, UNIT_TEST_DEVICE_IDENTIFIER)) { + /* not a device state we're interested in */ + return; + } + + if (device_state->eid) { + consumer->state = device_state->state; + if (consumer->sig_on_non_aggregate_state) { + consumer->sig_on_non_aggregate_state = 0; + consumer->already_out = 1; + ast_cond_signal(&consumer->out); + } + } else { + consumer->aggregate_state = device_state->state; + consumer->already_out = 1; + ast_cond_signal(&consumer->out); + } +} + +static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +{ + struct consumer *consumer = data; + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(consumer); + } +} + +static void consumer_wait_for(struct consumer *consumer) +{ + int res; + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 10, + .tv_nsec = start.tv_usec * 1000 + }; + + SCOPED_MUTEX(lock, &consumer->lock); + + if (consumer->already_out) { + consumer->already_out = 0; + } + + while(1) { + res = ast_cond_timedwait(&consumer->out, &consumer->lock, &end); + if (!res || res == ETIMEDOUT) { + break; + } + } + consumer->already_out = 0; +} + +static int remove_device_states_cb(void *obj, void *arg, int flags) +{ + RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup); + struct ast_device_state_message *device_state = stasis_message_data(msg); + if (strcmp(UNIT_TEST_DEVICE_IDENTIFIER, device_state->device)) { + msg = NULL; + return 0; + } + + msg = stasis_cache_clear_create(ast_device_state_message_type(), device_state->cache_id); + /* topic guaranteed to have been created by this point */ + stasis_publish(ast_device_state_topic(device_state->device), msg); + return 0; +} + +static void cache_cleanup(int unused) +{ + RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup); + /* remove all device states created during this test */ + cache_dump = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + if (!cache_dump) { + return; + } + ao2_callback(cache_dump, 0, remove_device_states_cb, NULL); +} + +AST_TEST_DEFINE(device_state_aggregation_test) +{ + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, device_msg_router, NULL, stasis_message_router_unsubscribe); + RAII_VAR(struct ast_eid *, foreign_eid, NULL, ast_free); + RAII_VAR(int, cleanup_cache, 0, cache_cleanup); + int res; + struct ast_device_state_message *device_state; + struct stasis_message *msg; + + switch (cmd) { + case TEST_INIT: + info->name = "device_state_aggregation_test"; + info->category = "/main/devicestate/"; + info->summary = "Tests message routing and aggregation through the Stasis device state system."; + info->description = + "Verifies that the device state system passes " + "messages appropriately, that the aggregator is " + "working properly, that the aggregate results match " + "the expected combined devstate, and that the cached " + "aggregate devstate is correct."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + foreign_eid = ast_malloc(sizeof(*foreign_eid)); + ast_test_validate(test, NULL != foreign_eid); + memset(foreign_eid, 0xFF, sizeof(*foreign_eid)); + + consumer = consumer_create(); + ast_test_validate(test, NULL != consumer); + + device_msg_router = stasis_message_router_create(stasis_caching_get_topic(ast_device_state_topic_cached())); + ast_test_validate(test, NULL != device_msg_router); + + ao2_ref(consumer, +1); + res = stasis_message_router_add(device_msg_router, stasis_cache_update_type(), consumer_exec, consumer); + ast_test_validate(test, !res); + + res = stasis_message_router_add(device_msg_router, stasis_subscription_change_type(), consumer_finalize, consumer); + ast_test_validate(test, !res); + + /* push local state */ + ast_publish_device_state(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_NOT_INUSE, AST_DEVSTATE_CACHABLE); + + consumer_wait_for(consumer); + ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state); + ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state); + + msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + device_state = stasis_message_data(msg); + ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state); + ao2_cleanup(msg); + msg = NULL; + + /* push remote state */ + /* this will not produce a new aggregate state message since the aggregate state does not change */ + consumer->sig_on_non_aggregate_state = 1; + ast_publish_device_state_full(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_NOT_INUSE, AST_DEVSTATE_CACHABLE, foreign_eid); + + consumer_wait_for(consumer); + ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state); + ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state); + + msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + device_state = stasis_message_data(msg); + ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state); + ao2_cleanup(msg); + msg = NULL; + + /* push remote state different from local state */ + ast_publish_device_state_full(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_INUSE, AST_DEVSTATE_CACHABLE, foreign_eid); + + consumer_wait_for(consumer); + ast_test_validate(test, AST_DEVICE_INUSE == consumer->state); + ast_test_validate(test, AST_DEVICE_INUSE == consumer->aggregate_state); + + msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + device_state = stasis_message_data(msg); + ast_test_validate(test, AST_DEVICE_INUSE == device_state->state); + ao2_cleanup(msg); + msg = NULL; + + /* push local state that will cause aggregated state different from local non-aggregate state */ + ast_publish_device_state(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_RINGING, AST_DEVSTATE_CACHABLE); + + consumer_wait_for(consumer); + ast_test_validate(test, AST_DEVICE_RINGING == consumer->state); + ast_test_validate(test, AST_DEVICE_RINGINUSE == consumer->aggregate_state); + + msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + device_state = stasis_message_data(msg); + ast_test_validate(test, AST_DEVICE_RINGINUSE == device_state->state); + ao2_cleanup(msg); + msg = NULL; + + return AST_TEST_PASS; +} + static int unload_module(void) { AST_TEST_UNREGISTER(device2extenstate_test); + AST_TEST_UNREGISTER(device_state_aggregation_test); return 0; } static int load_module(void) { + AST_TEST_REGISTER(device_state_aggregation_test); AST_TEST_REGISTER(device2extenstate_test); return AST_MODULE_LOAD_SUCCESS; } |