diff options
author | Kinsey Moore <kmoore@digium.com> | 2013-04-16 15:33:59 +0000 |
---|---|---|
committer | Kinsey Moore <kmoore@digium.com> | 2013-04-16 15:33:59 +0000 |
commit | 191cf99ae1f821dec98199931fc775fc0716d27c (patch) | |
tree | 210b5ebfaf3f30fa06c666e8d99e93e9918253a5 /main/ccss.c | |
parent | c1ae5dc49be01a567b403306ee4b560391293f67 (diff) |
Move device state distribution to Stasis-core
In the move from Asterisk's event system to Stasis, this makes
distributed device state aggregation always-on, removes unnecessary
task processors where possible, and collapses aggregate and
non-aggregate states into a single cache for ease of retrieval. This
also removes an intermediary step in device state aggregation.
Review: https://reviewboard.asterisk.org/r/2389/
(closes issue ASTERISK-21101)
Patch-by: Kinsey Moore <kmoore@digium.com>
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@385860 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/ccss.c')
-rw-r--r-- | main/ccss.c | 108 |
1 files changed, 50 insertions, 58 deletions
diff --git a/main/ccss.c b/main/ccss.c index b479a3c54..6b59a2f37 100644 --- a/main/ccss.c +++ b/main/ccss.c @@ -1209,7 +1209,7 @@ struct generic_monitor_instance_list { * recalled */ int fit_for_recall; - struct ast_event_sub *sub; + struct stasis_subscription *sub; AST_LIST_HEAD_NOLOCK(, generic_monitor_instance) list; }; @@ -1260,19 +1260,20 @@ static void generic_monitor_instance_list_destructor(void *obj) struct generic_monitor_instance_list *generic_list = obj; struct generic_monitor_instance *generic_instance; - generic_list->sub = ast_event_unsubscribe(generic_list->sub); + generic_list->sub = stasis_unsubscribe(generic_list->sub); while ((generic_instance = AST_LIST_REMOVE_HEAD(&generic_list->list, next))) { ast_free(generic_instance); } ast_free((char *)generic_list->device_name); } -static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata); +static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor) { struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list), generic_monitor_instance_list_destructor, "allocate generic monitor instance list"); char * device_name; + struct stasis_topic *device_specific_topic; if (!generic_list) { return NULL; @@ -1285,11 +1286,12 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_ ast_tech_to_upper(device_name); generic_list->device_name = device_name; - if (!(generic_list->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, - generic_monitor_devstate_cb, "Requesting CC", NULL, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, monitor->interface->device_name, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_END))) { + device_specific_topic = ast_device_state_topic(device_name); + if (!device_specific_topic) { + return NULL; + } + + if (!(generic_list->sub = stasis_subscribe(device_specific_topic, generic_monitor_devstate_cb, NULL))) { cc_unref(generic_list, "Failed to subscribe to device state"); return NULL; } @@ -1298,35 +1300,25 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_ return generic_list; } -struct generic_tp_cb_data { - const char *device_name; - enum ast_device_state new_state; -}; - static int generic_monitor_devstate_tp_cb(void *data) { - struct generic_tp_cb_data *gtcd = data; - enum ast_device_state new_state = gtcd->new_state; - enum ast_device_state previous_state = gtcd->new_state; - const char *monitor_name = gtcd->device_name; + RAII_VAR(struct ast_device_state_message *, dev_state, data, ao2_cleanup); + enum ast_device_state new_state = dev_state->state; + enum ast_device_state previous_state; struct generic_monitor_instance_list *generic_list; struct generic_monitor_instance *generic_instance; - if (!(generic_list = find_generic_monitor_instance_list(monitor_name))) { + if (!(generic_list = find_generic_monitor_instance_list(dev_state->device))) { /* The most likely cause for this is that we destroyed the monitor in the * time between subscribing to its device state and the time this executes. * Not really a big deal. */ - ast_free((char *) gtcd->device_name); - ast_free(gtcd); return 0; } if (generic_list->current_state == new_state) { /* The device state hasn't actually changed, so we don't really care */ cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback"); - ast_free((char *) gtcd->device_name); - ast_free(gtcd); return 0; } @@ -1346,33 +1338,31 @@ static int generic_monitor_devstate_tp_cb(void *data) } } cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback"); - ast_free((char *) gtcd->device_name); - ast_free(gtcd); return 0; } -static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata) +static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { /* Wow, it's cool that we've picked up on a state change, but we really want * the actual work to be done in the core's taskprocessor execution thread * so that all monitor operations can be serialized. Locks?! We don't need * no steenkin' locks! */ - struct generic_tp_cb_data *gtcd = ast_calloc(1, sizeof(*gtcd)); - - if (!gtcd) { + struct ast_device_state_message *dev_state; + if (ast_device_state_message_type() != stasis_message_type(msg)) { return; } - if (!(gtcd->device_name = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE)))) { - ast_free(gtcd); + dev_state = stasis_message_data(msg); + if (dev_state->eid) { + /* ignore non-aggregate states */ return; } - gtcd->new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); - if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, gtcd)) { - ast_free((char *)gtcd->device_name); - ast_free(gtcd); + ao2_t_ref(dev_state, +1, "Bumping dev_state ref for cc_core_taskprocessor"); + if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, dev_state)) { + ao2_cleanup(dev_state); + return; } } @@ -2502,7 +2492,7 @@ struct cc_generic_agent_pvt { * device state of the caller in order to * determine when we may move on */ - struct ast_event_sub *sub; + struct stasis_subscription *sub; /*! * Scheduler id of offer timer. */ @@ -2635,34 +2625,33 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent) return 0; } -static int generic_agent_devstate_unsubscribe(void *data) +static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - struct ast_cc_agent *agent = data; + struct ast_cc_agent *agent = userdata; + enum ast_device_state new_state; + struct ast_device_state_message *dev_state; struct cc_generic_agent_pvt *generic_pvt = agent->private_data; - if (generic_pvt->sub != NULL) { - generic_pvt->sub = ast_event_unsubscribe(generic_pvt->sub); + if (stasis_subscription_final_message(sub, msg)) { + cc_unref(agent, "Done holding ref for subscription"); + return; + } else if (ast_device_state_message_type() != stasis_message_type(msg)) { + return; } - cc_unref(agent, "Done unsubscribing from devstate"); - return 0; -} -static void generic_agent_devstate_cb(const struct ast_event *event, void *userdata) -{ - struct ast_cc_agent *agent = userdata; - enum ast_device_state new_state; + dev_state = stasis_message_data(msg); + if (dev_state->eid) { + /* ignore non-aggregate states */ + return; + } - new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); + new_state = dev_state->state; if (!cc_generic_is_device_available(new_state)) { /* Not interested in this new state of the device. It is still busy. */ return; } - /* We can't unsubscribe from device state events here because it causes a deadlock */ - if (ast_taskprocessor_push(cc_core_taskprocessor, generic_agent_devstate_unsubscribe, - cc_ref(agent, "ref agent for device state unsubscription"))) { - cc_unref(agent, "Unref agent unsubscribing from devstate failed"); - } + generic_pvt->sub = stasis_unsubscribe(sub); ast_cc_agent_caller_available(agent->core_id, "%s is no longer busy", agent->device_name); } @@ -2670,18 +2659,21 @@ static int cc_generic_agent_start_monitoring(struct ast_cc_agent *agent) { struct cc_generic_agent_pvt *generic_pvt = agent->private_data; struct ast_str *str = ast_str_alloca(128); + struct stasis_topic *device_specific_topic; ast_assert(generic_pvt->sub == NULL); ast_str_set(&str, 0, "Agent monitoring %s device state since it is busy\n", agent->device_name); - if (!(generic_pvt->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, - generic_agent_devstate_cb, ast_str_buffer(str), agent, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, agent->device_name, - AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_END))) { + device_specific_topic = ast_device_state_topic(agent->device_name); + if (!device_specific_topic) { + return -1; + } + + if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) { return -1; } + cc_ref(agent, "Ref agent for subscription"); return 0; } @@ -2792,7 +2784,7 @@ static void cc_generic_agent_destructor(struct ast_cc_agent *agent) cc_generic_agent_stop_offer_timer(agent); if (agent_pvt->sub) { - agent_pvt->sub = ast_event_unsubscribe(agent_pvt->sub); + agent_pvt->sub = stasis_unsubscribe(agent_pvt->sub); } ast_free(agent_pvt); |