summaryrefslogtreecommitdiff
path: root/funcs/func_cdr.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2013-12-19 00:50:01 +0000
committerMatthew Jordan <mjordan@digium.com>2013-12-19 00:50:01 +0000
commit7e9febbf86f7a9aa0cc1d9852d1ed1b77f25b3ce (patch)
treefafe2c3b45211d267449024e44dd10f65ac77e9d /funcs/func_cdr.c
parentaf723c6572e988753c24cbb911d6b521600f4a3f (diff)
app_cdr,app_forkcdr,func_cdr: Synchronize with engine when manipulating state
When doing the rework of the CDR engine that pushed all of the logic into cdr.c and made it respond to changes in channel state over Stasis, we knew that accessing the CDR engine from the dialplan would be "slightly" non-deterministic. Dialplan threads would be accessing CDRs while Stasis threads would be updating the state of said CDRs - whereas in the past, everything happened on the dialplan threads. Tests have shown that "slightly" is in reality "very". This patch synchronizes things by making the dialplan applications/functions that manipulate CDRs do so over Stasis. ForkCDR, NoCDR, ResetCDR, CDR, and CDR_PROP now all use Stasis to send their requests over to the CDR engine, and synchronize on the channel Stasis topic via a subscription so that they return their values/control to the dialplan at the appropriate time. While going through this, the following changes were also made: * DISA, which can reset the CDR when a user successfully authenticates, now just uses the ResetCDR app to do this. This prevents having to duplicate the same Stasis synchronization logic in that application. * Answer no longer disables CDRs. It actually didn't work anyway - calling DISABLE on the channel's CDR doesn't stop the CDR from getting the Answer time - it just kills all CDRs on that channel, which isn't what the caller would intend. (closes issue ASTERISK-22884) (closes issue ASTERISK-22886) Review: https://reviewboard.asterisk.org/r/3057/ ........ Merged revisions 404294 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@404295 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'funcs/func_cdr.c')
-rw-r--r--funcs/func_cdr.c288
1 files changed, 234 insertions, 54 deletions
diff --git a/funcs/func_cdr.c b/funcs/func_cdr.c
index 48df0a092..3f248168c 100644
--- a/funcs/func_cdr.c
+++ b/funcs/func_cdr.c
@@ -39,6 +39,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/utils.h"
#include "asterisk/app.h"
#include "asterisk/cdr.h"
+#include "asterisk/stasis.h"
/*** DOCUMENTATION
<function name="CDR" language="en_US">
@@ -201,45 +202,65 @@ AST_APP_OPTIONS(cdr_func_options, {
AST_APP_OPTION('u', OPT_UNPARSED),
});
-static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
- char *buf, size_t len)
+struct cdr_func_payload {
+ struct ast_channel *chan;
+ const char *cmd;
+ const char *arguments;
+ const char *value;
+};
+
+struct cdr_func_data {
+ char *buf;
+ size_t len;
+};
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_read_message_type);
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_write_message_type);
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_prop_write_message_type);
+
+static void cdr_read_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
+ struct cdr_func_data *output = data;
+ struct cdr_func_payload *payload = stasis_message_data(message);
+ char *info;
char *value = NULL;
struct ast_flags flags = { 0 };
char tempbuf[512];
- char *info;
AST_DECLARE_APP_ARGS(args,
AST_APP_ARG(variable);
AST_APP_ARG(options);
);
- buf[0] = '\0';/* Ensure the buffer is initialized. */
+ if (cdr_read_message_type() != stasis_message_type(message)) {
+ return;
+ }
- if (!chan) {
- return -1;
+ if (!payload || !output) {
+ return;
}
- if (ast_strlen_zero(parse)) {
- ast_log(AST_LOG_WARNING, "FUNC_CDR requires a variable (FUNC_CDR(variable[,option]))\n)");
- return -1;
+ if (ast_strlen_zero(payload->arguments)) {
+ ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable[,option]))\n)",
+ payload->cmd, payload->cmd);
+ return;
}
- info = ast_strdupa(parse);
+ info = ast_strdupa(payload->arguments);
AST_STANDARD_APP_ARGS(args, info);
if (!ast_strlen_zero(args.options)) {
ast_app_parse_options(cdr_func_options, &flags, NULL, args.options);
}
- if (ast_strlen_zero(ast_channel_name(chan))) {
+ if (ast_strlen_zero(ast_channel_name(payload->chan))) {
/* Format request on a dummy channel */
- ast_cdr_format_var(ast_channel_cdr(chan), args.variable, &value, tempbuf, sizeof(tempbuf), 0);
+ ast_cdr_format_var(ast_channel_cdr(payload->chan), args.variable, &value, tempbuf, sizeof(tempbuf), 0);
if (ast_strlen_zero(value)) {
- return 0;
+ return;
}
ast_copy_string(tempbuf, value, sizeof(tempbuf));
ast_set_flag(&flags, OPT_UNPARSED);
- } else if (ast_cdr_getvar(ast_channel_name(chan), args.variable, tempbuf, sizeof(tempbuf))) {
- return 0;
+ } else if (ast_cdr_getvar(ast_channel_name(payload->chan), args.variable, tempbuf, sizeof(tempbuf))) {
+ return;
}
if (ast_test_flag(&flags, OPT_FLOAT)
@@ -249,8 +270,8 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
if (sscanf(tempbuf, "%30ld", &ms) != 1) {
ast_log(AST_LOG_WARNING, "Unable to parse %s (%s) from the CDR for channel %s\n",
- args.variable, tempbuf, ast_channel_name(chan));
- return 0;
+ args.variable, tempbuf, ast_channel_name(payload->chan));
+ return;
}
dtime = (double)(ms / 1000.0);
snprintf(tempbuf, sizeof(tempbuf), "%lf", dtime);
@@ -265,8 +286,8 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
if (sscanf(tempbuf, "%ld.%ld", &fmt_time.tv_sec, &tv_usec) != 2) {
ast_log(AST_LOG_WARNING, "Unable to parse %s (%s) from the CDR for channel %s\n",
- args.variable, tempbuf, ast_channel_name(chan));
- return 0;
+ args.variable, tempbuf, ast_channel_name(payload->chan));
+ return;
}
fmt_time.tv_usec = tv_usec;
ast_localtime(&fmt_time, &tm, NULL);
@@ -276,8 +297,8 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
if (sscanf(tempbuf, "%8d", &disposition) != 1) {
ast_log(AST_LOG_WARNING, "Unable to parse %s (%s) from the CDR for channel %s\n",
- args.variable, tempbuf, ast_channel_name(chan));
- return 0;
+ args.variable, tempbuf, ast_channel_name(payload->chan));
+ return;
}
snprintf(tempbuf, sizeof(tempbuf), "%s", ast_cdr_disp2str(disposition));
} else if (!strcasecmp("amaflags", args.variable)) {
@@ -285,30 +306,45 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
if (sscanf(tempbuf, "%8d", &amaflags) != 1) {
ast_log(AST_LOG_WARNING, "Unable to parse %s (%s) from the CDR for channel %s\n",
- args.variable, tempbuf, ast_channel_name(chan));
- return 0;
+ args.variable, tempbuf, ast_channel_name(payload->chan));
+ return;
}
snprintf(tempbuf, sizeof(tempbuf), "%s", ast_channel_amaflags2string(amaflags));
}
}
- ast_copy_string(buf, tempbuf, len);
- return 0;
+ ast_copy_string(output->buf, tempbuf, output->len);
}
-static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
- const char *value)
+static void cdr_write_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
+ struct cdr_func_payload *payload = stasis_message_data(message);
struct ast_flags flags = { 0 };
AST_DECLARE_APP_ARGS(args,
AST_APP_ARG(variable);
AST_APP_ARG(options);
);
+ char *parse;
- if (ast_strlen_zero(parse) || !value || !chan) {
- return -1;
+ if (cdr_write_message_type() != stasis_message_type(message)) {
+ return;
}
+ if (!payload) {
+ return;
+ }
+
+ if (ast_strlen_zero(payload->arguments)) {
+ ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable)=value)\n)",
+ payload->cmd, payload->cmd);
+ return;
+ }
+ if (ast_strlen_zero(payload->value)) {
+ ast_log(AST_LOG_WARNING, "%s requires a value (%s(variable)=value)\n)",
+ payload->cmd, payload->cmd);
+ return;
+ }
+ parse = ast_strdupa(payload->arguments);
AST_STANDARD_APP_ARGS(args, parse);
if (!ast_strlen_zero(args.options)) {
@@ -317,47 +353,61 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
if (!strcasecmp(args.variable, "accountcode")) {
ast_log(AST_LOG_WARNING, "Using the CDR function to set 'accountcode' is deprecated. Please use the CHANNEL function instead.\n");
- ast_channel_lock(chan);
- ast_channel_accountcode_set(chan, value);
- ast_channel_unlock(chan);
+ ast_channel_lock(payload->chan);
+ ast_channel_accountcode_set(payload->chan, payload->value);
+ ast_channel_unlock(payload->chan);
} else if (!strcasecmp(args.variable, "peeraccount")) {
ast_log(AST_LOG_WARNING, "The 'peeraccount' setting is not supported. Please set the 'accountcode' on the appropriate channel using the CHANNEL function.\n");
} else if (!strcasecmp(args.variable, "userfield")) {
- ast_cdr_setuserfield(ast_channel_name(chan), value);
+ ast_cdr_setuserfield(ast_channel_name(payload->chan), payload->value);
} else if (!strcasecmp(args.variable, "amaflags")) {
ast_log(AST_LOG_WARNING, "Using the CDR function to set 'amaflags' is deprecated. Please use the CHANNEL function instead.\n");
- if (isdigit(*value)) {
+ if (isdigit(*payload->value)) {
int amaflags;
- sscanf(value, "%30d", &amaflags);
- ast_channel_lock(chan);
- ast_channel_amaflags_set(chan, amaflags);
- ast_channel_unlock(chan);
+ sscanf(payload->value, "%30d", &amaflags);
+ ast_channel_lock(payload->chan);
+ ast_channel_amaflags_set(payload->chan, amaflags);
+ ast_channel_unlock(payload->chan);
} else {
- ast_channel_lock(chan);
- ast_channel_amaflags_set(chan, ast_channel_string2amaflag(value));
- ast_channel_unlock(chan);
+ ast_channel_lock(payload->chan);
+ ast_channel_amaflags_set(payload->chan, ast_channel_string2amaflag(payload->value));
+ ast_channel_unlock(payload->chan);
}
} else {
- ast_cdr_setvar(ast_channel_name(chan), args.variable, value);
+ ast_cdr_setvar(ast_channel_name(payload->chan), args.variable, payload->value);
}
-
- return 0;
+ return;
}
-static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse,
- const char *value)
+static void cdr_prop_write_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
+ struct cdr_func_payload *payload = stasis_message_data(message);
enum ast_cdr_options option;
-
+ char *parse;
AST_DECLARE_APP_ARGS(args,
AST_APP_ARG(variable);
AST_APP_ARG(options);
);
- if (ast_strlen_zero(parse) || !value || !chan) {
- return -1;
+ if (cdr_prop_write_message_type() != stasis_message_type(message)) {
+ return;
}
+ if (!payload) {
+ return;
+ }
+
+ if (ast_strlen_zero(payload->arguments)) {
+ ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable)=value)\n)",
+ payload->cmd, payload->cmd);
+ return;
+ }
+ if (ast_strlen_zero(payload->value)) {
+ ast_log(AST_LOG_WARNING, "%s requires a value (%s(variable)=value)\n)",
+ payload->cmd, payload->cmd);
+ return;
+ }
+ parse = ast_strdupa(payload->arguments);
AST_STANDARD_APP_ARGS(args, parse);
if (!strcasecmp("party_a", args.variable)) {
@@ -365,15 +415,139 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse
} else if (!strcasecmp("disable", args.variable)) {
option = AST_CDR_FLAG_DISABLE_ALL;
} else {
- ast_log(AST_LOG_WARNING, "Unknown option %s used with CDR_PROP\n", args.variable);
- return 0;
+ ast_log(AST_LOG_WARNING, "Unknown option %s used with %s\n", args.variable, payload->cmd);
+ return;
}
- if (ast_true(value)) {
- ast_cdr_set_property(ast_channel_name(chan), option);
+ if (ast_true(payload->value)) {
+ ast_cdr_set_property(ast_channel_name(payload->chan), option);
} else {
- ast_cdr_clear_property(ast_channel_name(chan), option);
+ ast_cdr_clear_property(ast_channel_name(payload->chan), option);
+ }
+}
+
+
+static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
+ char *buf, size_t len)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+ RAII_VAR(struct cdr_func_payload *, payload,
+ ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
+ struct cdr_func_data output = { 0, };
+
+ if (!payload) {
+ return -1;
}
+ payload->chan = chan;
+ payload->cmd = cmd;
+ payload->arguments = parse;
+
+ buf[0] = '\0';/* Ensure the buffer is initialized. */
+ output.buf = buf;
+ output.len = len;
+
+ message = stasis_message_create(cdr_read_message_type(), payload);
+ if (!message) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ /* If this is a request on a dummy channel, we're doing post-processing on an
+ * already dispatched CDR. Simply call the callback to calculate the value and
+ * return, instead of posting to Stasis as we would for a running channel.
+ */
+ if (ast_strlen_zero(ast_channel_name(chan))) {
+ cdr_read_callback(NULL, NULL, message);
+ } else {
+ RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+
+ subscription = stasis_subscribe(ast_channel_topic(chan), cdr_read_callback, &output);
+ if (!subscription) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ stasis_publish(ast_channel_topic(chan), message);
+
+ subscription = stasis_unsubscribe_and_join(subscription);
+ }
+
+ return 0;
+}
+
+static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
+ const char *value)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+ RAII_VAR(struct cdr_func_payload *, payload,
+ ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+
+ if (!payload) {
+ return -1;
+ }
+ payload->chan = chan;
+ payload->cmd = cmd;
+ payload->arguments = parse;
+ payload->value = value;
+
+ message = stasis_message_create(cdr_write_message_type(), payload);
+ if (!message) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ subscription = stasis_subscribe(ast_channel_topic(chan), cdr_write_callback, NULL);
+ if (!subscription) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ stasis_publish(ast_channel_topic(chan), message);
+
+ subscription = stasis_unsubscribe_and_join(subscription);
+
+ return 0;
+}
+
+static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse,
+ const char *value)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+ RAII_VAR(struct cdr_func_payload *, payload,
+ ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+
+ if (!payload) {
+ return -1;
+ }
+ payload->chan = chan;
+ payload->cmd = cmd;
+ payload->arguments = parse;
+ payload->value = value;
+
+ message = stasis_message_create(cdr_prop_write_message_type(), payload);
+ if (!message) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ subscription = stasis_subscribe(ast_channel_topic(chan), cdr_prop_write_callback, NULL);
+ if (!subscription) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ stasis_publish(ast_channel_topic(chan), message);
+
+ subscription = stasis_unsubscribe_and_join(subscription);
+
return 0;
}
@@ -393,6 +567,9 @@ static int unload_module(void)
{
int res = 0;
+ STASIS_MESSAGE_TYPE_CLEANUP(cdr_read_message_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(cdr_write_message_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(cdr_prop_write_message_type);
res |= ast_custom_function_unregister(&cdr_function);
res |= ast_custom_function_unregister(&cdr_prop_function);
@@ -403,6 +580,9 @@ static int load_module(void)
{
int res = 0;
+ res |= STASIS_MESSAGE_TYPE_INIT(cdr_read_message_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(cdr_write_message_type);
+ res |= STASIS_MESSAGE_TYPE_INIT(cdr_prop_write_message_type);
res |= ast_custom_function_register(&cdr_function);
res |= ast_custom_function_register(&cdr_prop_function);