summaryrefslogtreecommitdiff
path: root/res/stasis/app.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/stasis/app.c')
-rw-r--r--res/stasis/app.c193
1 files changed, 127 insertions, 66 deletions
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 91b006572..ccb93bc4d 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -114,20 +114,19 @@ static void forwards_unsubscribe(struct app_forwards *forwards)
static struct app_forwards *forwards_create(struct stasis_app *app,
const char *id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
if (!app || ast_strlen_zero(id)) {
return NULL;
}
- forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+ forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
if (!forwards) {
return NULL;
}
- strcpy(forwards->id, id);
+ strcpy(forwards->id, id); /* SAFE */
- ao2_ref(forwards, +1);
return forwards;
}
@@ -338,7 +337,7 @@ static void sub_default_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ast_json *json;
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(app);
@@ -355,6 +354,7 @@ static void sub_default_handler(void *data, struct stasis_subscription *sub,
}
app_send(app, json);
+ ast_json_unref(json);
}
/*! \brief Typedef for callbacks that get called on channel snapshot updates */
@@ -557,11 +557,12 @@ static void sub_channel_update_handler(void *data,
stasis_message_timestamp(message);
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+ struct ast_json *msg;
msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
if (msg) {
app_send(app, msg);
+ ast_json_unref(msg);
}
}
@@ -589,7 +590,7 @@ static struct ast_json *simple_endpoint_event(
static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
{
- RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ struct ast_endpoint_snapshot *snapshot;
struct ast_json *json_endpoint;
struct ast_json *message;
struct stasis_app *app = pvt;
@@ -613,6 +614,7 @@ static int message_received_handler(const char *endpoint_id, struct ast_json *js
}
json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
+ ao2_ref(snapshot, -1);
if (!json_endpoint) {
return -1;
}
@@ -634,7 +636,6 @@ static void sub_endpoint_update_handler(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
{
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot;
@@ -651,6 +652,8 @@ static void sub_endpoint_update_handler(void *data,
old_snapshot = stasis_message_data(update->old_snapshot);
if (new_snapshot) {
+ struct ast_json *json;
+
tv = stasis_message_timestamp(update->new_snapshot);
json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
@@ -659,6 +662,7 @@ static void sub_endpoint_update_handler(void *data,
}
app_send(app, json);
+ ast_json_unref(json);
}
if (!new_snapshot && old_snapshot) {
@@ -686,7 +690,7 @@ static void sub_bridge_update_handler(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
{
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ast_json *json = NULL;
struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_bridge_snapshot *new_snapshot;
@@ -720,6 +724,7 @@ static void sub_bridge_update_handler(void *data,
if (json) {
app_send(app, json);
+ ast_json_unref(json);
}
if (!new_snapshot && old_snapshot) {
@@ -1022,7 +1027,7 @@ void app_send(struct stasis_app *app, struct ast_json *message)
{
stasis_app_cb handler;
char eid[20];
- RAII_VAR(void *, data, NULL, ao2_cleanup);
+ void *data;
if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
@@ -1031,37 +1036,36 @@ void app_send(struct stasis_app *app, struct ast_json *message)
}
/* Copy off mutable state with lock held */
- {
- SCOPED_AO2LOCK(lock, app);
- handler = app->handler;
- if (app->data) {
- ao2_ref(app->data, +1);
- data = app->data;
- }
- /* Name is immutable; no need to copy */
- }
-
- if (!handler) {
+ ao2_lock(app);
+ handler = app->handler;
+ data = ao2_bump(app->data);
+ ao2_unlock(app);
+ /* Name is immutable; no need to copy */
+
+ if (handler) {
+ handler(data, app->name, message);
+ } else {
ast_verb(3,
"Inactive Stasis app '%s' missed message\n", app->name);
- return;
}
-
- handler(data, app->name, message);
+ ao2_cleanup(data);
}
void app_deactivate(struct stasis_app *app)
{
- SCOPED_AO2LOCK(lock, app);
+ ao2_lock(app);
+
ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
app->handler = NULL;
ao2_cleanup(app->data);
app->data = NULL;
+
+ ao2_unlock(app);
}
void app_shutdown(struct stasis_app *app)
{
- SCOPED_AO2LOCK(lock, app);
+ ao2_lock(app);
ast_assert(app_is_finished(app));
@@ -1071,27 +1075,37 @@ void app_shutdown(struct stasis_app *app)
app->bridge_router = NULL;
stasis_message_router_unsubscribe(app->endpoint_router);
app->endpoint_router = NULL;
+
+ ao2_unlock(app);
}
int app_is_active(struct stasis_app *app)
{
- SCOPED_AO2LOCK(lock, app);
- return app->handler != NULL;
+ int ret;
+
+ ao2_lock(app);
+ ret = app->handler != NULL;
+ ao2_unlock(app);
+
+ return ret;
}
int app_is_finished(struct stasis_app *app)
{
- SCOPED_AO2LOCK(lock, app);
+ int ret;
+
+ ao2_lock(app);
+ ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
+ ao2_unlock(app);
- return app->handler == NULL && ao2_container_count(app->forwards) == 0;
+ return ret;
}
void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
{
- SCOPED_AO2LOCK(lock, app);
-
+ ao2_lock(app);
if (app->handler && app->data) {
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+ struct ast_json *msg;
ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
@@ -1100,17 +1114,15 @@ void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
"application", app->name);
if (msg) {
app_send(app, msg);
+ ast_json_unref(msg);
}
} else {
ast_verb(1, "Activating Stasis app '%s'\n", app->name);
}
app->handler = handler;
- ao2_cleanup(app->data);
- if (data) {
- ao2_ref(data, +1);
- }
- app->data = data;
+ ao2_replace(app->data, data);
+ ao2_unlock(app);
}
const char *stasis_app_name(const struct stasis_app *app)
@@ -1187,68 +1199,72 @@ void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
struct ast_json *app_to_json(const struct stasis_app *app)
{
- RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ struct ast_json *json;
struct ast_json *channels;
struct ast_json *bridges;
struct ast_json *endpoints;
struct ao2_iterator i;
- void *obj;
+ struct app_forwards *forwards;
json = ast_json_pack("{s: s, s: [], s: [], s: []}",
"name", app->name,
"channel_ids", "bridge_ids", "endpoint_ids");
+ if (!json) {
+ return NULL;
+ }
channels = ast_json_object_get(json, "channel_ids");
bridges = ast_json_object_get(json, "bridge_ids");
endpoints = ast_json_object_get(json, "endpoint_ids");
i = ao2_iterator_init(app->forwards, 0);
- while ((obj = ao2_iterator_next(&i))) {
- RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
- RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
- int append_res = -1;
-
- id = ast_json_string_create(forwards->id);
+ while ((forwards = ao2_iterator_next(&i))) {
+ struct ast_json *array = NULL;
+ int append_res;
switch (forwards->forward_type) {
case FORWARD_CHANNEL:
- append_res = ast_json_array_append(channels,
- ast_json_ref(id));
+ array = channels;
break;
case FORWARD_BRIDGE:
- append_res = ast_json_array_append(bridges,
- ast_json_ref(id));
+ array = bridges;
break;
case FORWARD_ENDPOINT:
- append_res = ast_json_array_append(endpoints,
- ast_json_ref(id));
+ array = endpoints;
break;
}
+ /* If forward_type value is unexpected this will safely return an error. */
+ append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
+ ao2_ref(forwards, -1);
+
if (append_res != 0) {
ast_log(LOG_ERROR, "Error building response\n");
ao2_iterator_destroy(&i);
+ ast_json_unref(json);
+
return NULL;
}
}
ao2_iterator_destroy(&i);
- return ast_json_ref(json);
+ return json;
}
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
struct app_forwards *forwards;
- SCOPED_AO2LOCK(lock, app->forwards);
- int res;
if (!app) {
return -1;
}
+ ao2_lock(app->forwards);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
+ ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
+
return 0;
}
@@ -1256,16 +1272,21 @@ int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
+ int res;
+
/* Forwards not found, create one */
forwards = forwards_create_channel(app, chan);
if (!forwards) {
+ ao2_unlock(app->forwards);
+
return -1;
}
- res = ao2_link_flags(app->forwards, forwards,
- OBJ_NOLOCK);
+ res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
if (!res) {
+ ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
+
return -1;
}
}
@@ -1276,7 +1297,9 @@ int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
forwards->interested,
app->name);
+ ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
+
return 0;
}
@@ -1287,8 +1310,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj)
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ struct app_forwards *forwards;
if (!id) {
if (!strcmp(kind, "bridge")) {
@@ -1303,8 +1325,10 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
}
}
+ ao2_lock(app->forwards);
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
+ ao2_unlock(app->forwards);
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
return -1;
}
@@ -1323,6 +1347,8 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
messaging_app_unsubscribe_endpoint(app->name, id);
}
}
+ ao2_unlock(app->forwards);
+ ao2_ref(forwards, -1);
return 0;
}
@@ -1347,12 +1373,14 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
if (ast_strlen_zero(channel_id)) {
channel_id = CHANNEL_ALL;
}
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
+ ao2_cleanup(forwards);
+
return forwards != NULL;
}
@@ -1372,28 +1400,42 @@ struct stasis_app_event_source channel_event_source = {
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
struct app_forwards *forwards;
- SCOPED_AO2LOCK(lock, app->forwards);
if (!app) {
return -1;
}
+ ao2_lock(app->forwards);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
+ ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
+
return 0;
}
- forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+ forwards = ao2_find(app->forwards,
+ bridge ? bridge->uniqueid : BRIDGE_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
+ int res;
+
/* Forwards not found, create one */
forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
+ ao2_unlock(app->forwards);
+
+ return -1;
+ }
+
+ res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ if (!res) {
+ ao2_unlock(app->forwards);
+ ao2_ref(forwards, -1);
+
return -1;
}
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
++forwards->interested;
@@ -1402,7 +1444,9 @@ int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
forwards->interested,
app->name);
+ ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
+
return 0;
}
@@ -1459,16 +1503,18 @@ struct stasis_app_event_source bridge_event_source = {
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
struct app_forwards *forwards;
- SCOPED_AO2LOCK(lock, app->forwards);
if (!app) {
return -1;
}
+ ao2_lock(app->forwards);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
+ ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
+
return 0;
}
@@ -1476,12 +1522,23 @@ int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint
endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
+ int res;
+
/* Forwards not found, create one */
forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
+ ao2_unlock(app->forwards);
+
+ return -1;
+ }
+
+ res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ if (!res) {
+ ao2_unlock(app->forwards);
+ ao2_ref(forwards, -1);
+
return -1;
}
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
/* Subscribe for messages */
messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
@@ -1493,7 +1550,9 @@ int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint
forwards->interested,
app->name);
+ ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
+
return 0;
}
@@ -1513,12 +1572,14 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
if (ast_strlen_zero(endpoint_id)) {
endpoint_id = ENDPOINT_ALL;
}
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
+ ao2_cleanup(forwards);
+
return forwards != NULL;
}