From 9c00db7c16041c1805481be84f386cc86625bc49 Mon Sep 17 00:00:00 2001 From: Tilghman Lesher Date: Thu, 3 Jan 2008 06:16:48 +0000 Subject: Add coordination between AMI and AGI applications, with an asyncagi method Feature proposed and patched by: moy (Closes issue #11282) git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@96174 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- res/res_agi.c | 394 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 391 insertions(+), 3 deletions(-) (limited to 'res/res_agi.c') diff --git a/res/res_agi.c b/res/res_agi.c index 6fa0d7d35..9a637c46a 100644 --- a/res/res_agi.c +++ b/res/res_agi.c @@ -52,6 +52,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/lock.h" #include "asterisk/strings.h" #include "asterisk/agi.h" +#include "asterisk/manager.h" #include "asterisk/version.h" #include "asterisk/speech.h" #include "asterisk/manager.h" @@ -101,6 +102,7 @@ static int agidebug = 0; enum agi_result { AGI_RESULT_SUCCESS, AGI_RESULT_SUCCESS_FAST, + AGI_RESULT_SUCCESS_ASYNC, AGI_RESULT_FAILURE, AGI_RESULT_NOTFOUND, AGI_RESULT_HANGUP, @@ -140,6 +142,373 @@ int ast_agi_fdprintf(struct ast_channel *chan, int fd, char *fmt, ...) return ast_carefulwrite(fd, buf->str, buf->used, 100); } +/* linked list of AGI commands ready to be executed by Async AGI */ +struct agi_cmd { + char *cmd_buffer; + char *cmd_id; + AST_LIST_ENTRY(agi_cmd) entry; +}; + +static void free_agi_cmd(struct agi_cmd *cmd) +{ + ast_free(cmd->cmd_buffer); + ast_free(cmd->cmd_id); + ast_free(cmd); +} + +/* AGI datastore destructor */ +static void agi_destroy_commands_cb(void *data) +{ + struct agi_cmd *cmd; + AST_LIST_HEAD(, agi_cmd) *chan_cmds = data; + AST_LIST_LOCK(chan_cmds); + while ( (cmd = AST_LIST_REMOVE_HEAD(chan_cmds, entry)) ) { + free_agi_cmd(cmd); + } + AST_LIST_UNLOCK(chan_cmds); + AST_LIST_HEAD_DESTROY(chan_cmds); + ast_free(chan_cmds); +} + +/* channel datastore to keep the queue of AGI commands in the channel */ +static const struct ast_datastore_info agi_commands_datastore_info = { + .type = "AsyncAGI", + .destroy = agi_destroy_commands_cb +}; + +static const char mandescr_asyncagi[] = +"Description: Add an AGI command to the execute queue of the channel in Async AGI\n" +"Variables:\n" +" *Channel: Channel that is currently in Async AGI\n" +" *Command: Application to execute\n" +" CommandID: comand id. This will be sent back in CommandID header of AsyncAGI exec event notification\n" +"\n"; + +static struct agi_cmd *get_agi_cmd(struct ast_channel *chan) +{ + struct ast_datastore *store; + struct agi_cmd *cmd; + AST_LIST_HEAD(, agi_cmd) *agi_commands; + + ast_channel_lock(chan); + store = ast_channel_datastore_find(chan, &agi_commands_datastore_info, NULL); + ast_channel_unlock(chan); + if (!store) { + ast_log(LOG_ERROR, "Hu? datastore disappeared at Async AGI on Channel %s!\n", chan->name); + return NULL; + } + agi_commands = store->data; + AST_LIST_LOCK(agi_commands); + cmd = AST_LIST_REMOVE_HEAD(agi_commands, entry); + AST_LIST_UNLOCK(agi_commands); + return cmd; +} + +/* channel is locked when calling this one either from the CLI or manager thread */ +static int add_agi_cmd(struct ast_channel *chan, const char *cmd_buff, const char *cmd_id) +{ + struct ast_datastore *store; + struct agi_cmd *cmd; + AST_LIST_HEAD(, agi_cmd) *agi_commands; + + store = ast_channel_datastore_find(chan, &agi_commands_datastore_info, NULL); + if (!store) { + ast_log(LOG_WARNING, "Channel %s is not at Async AGI.\n", chan->name); + return -1; + } + agi_commands = store->data; + cmd = ast_calloc(1, sizeof(*cmd)); + if (!cmd) { + return -1; + } + cmd->cmd_buffer = ast_strdup(cmd_buff); + if (!cmd->cmd_buffer) { + ast_free(cmd); + return -1; + } + cmd->cmd_id = ast_strdup(cmd_id); + if (!cmd->cmd_id) { + ast_free(cmd->cmd_buffer); + ast_free(cmd); + return -1; + } + AST_LIST_LOCK(agi_commands); + AST_LIST_INSERT_TAIL(agi_commands, cmd, entry); + AST_LIST_UNLOCK(agi_commands); + return 0; +} + +static int add_to_agi(struct ast_channel *chan) +{ + struct ast_datastore *datastore; + AST_LIST_HEAD(, agi_cmd) *agi_cmds_list; + + /* check if already on AGI */ + ast_channel_lock(chan); + datastore = ast_channel_datastore_find(chan, &agi_commands_datastore_info, NULL); + ast_channel_unlock(chan); + if (datastore) { + /* we already have an AGI datastore, let's just + return success */ + return 0; + } + + /* the channel has never been on Async AGI, + let's allocate it's datastore */ + datastore = ast_channel_datastore_alloc(&agi_commands_datastore_info, "AGI"); + if (!datastore) { + return -1; + } + agi_cmds_list = ast_calloc(1, sizeof(*agi_cmds_list)); + if (!agi_cmds_list) { + ast_log(LOG_ERROR, "Unable to allocate Async AGI commands list.\n"); + ast_channel_datastore_free(datastore); + return -1; + } + datastore->data = agi_cmds_list; + AST_LIST_HEAD_INIT(agi_cmds_list); + ast_channel_lock(chan); + ast_channel_datastore_add(chan, datastore); + ast_channel_unlock(chan); + return 0; +} + +/*! + * \brief CLI command to add applications to execute in Async AGI + * \param e + * \param cmd + * \param a + * + * \retval CLI_SUCCESS on success + * \retval NULL when init or tab completion is used +*/ +static char *handle_cli_agi_add_cmd(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ast_channel *chan; + switch (cmd) { + case CLI_INIT: + e->command = "agi exec"; + e->usage = "Usage: agi exec [id]\n" + " Add AGI command to the execute queue of the specified channel in Async AGI\n"; + return NULL; + case CLI_GENERATE: + if (a->pos == 2) + return ast_complete_channels(a->line, a->word, a->pos, a->n, 2); + return NULL; + } + + if (a->argc < 4) + return CLI_SHOWUSAGE; + chan = ast_get_channel_by_name_locked(a->argv[2]); + if (!chan) { + ast_log(LOG_WARNING, "Channel %s does not exists or cannot lock it\n", a->argv[2]); + return CLI_FAILURE; + } + if (add_agi_cmd(chan, a->argv[3], (a->argc > 4 ? a->argv[4] : ""))) { + ast_log(LOG_WARNING, "failed to add AGI command to queue of channel %s\n", chan->name); + ast_channel_unlock(chan); + return CLI_FAILURE; + } + ast_log(LOG_DEBUG, "Added AGI command to channel %s queue\n", chan->name); + ast_channel_unlock(chan); + return CLI_SUCCESS; +} + +/*! + * \brief Add a new command to execute by the Async AGI application + * \param s + * \param m + * + * It will append the application to the specified channel's queue + * if the channel is not inside Async AGI application it will return an error + * \retval 0 on success or incorrect use + * \retval 1 on failure to add the command ( most likely because the channel + * is not in Async AGI loop ) +*/ +static int action_add_agi_cmd(struct mansession *s, const struct message *m) +{ + const char *channel = astman_get_header(m, "Channel"); + const char *cmdbuff = astman_get_header(m, "Command"); + const char *cmdid = astman_get_header(m, "CommandID"); + struct ast_channel *chan; + char buf[256]; + if (ast_strlen_zero(channel) || ast_strlen_zero(cmdbuff)) { + astman_send_error(s, m, "Both, Channel and Command are *required*"); + return 0; + } + chan = ast_get_channel_by_name_locked(channel); + if (!chan) { + snprintf(buf, sizeof(buf), "Channel %s does not exists or cannot get its lock", channel); + astman_send_error(s, m, buf); + return 1; + } + if (add_agi_cmd(chan, cmdbuff, cmdid)) { + snprintf(buf, sizeof(buf), "Failed to add AGI command to channel %s queue", chan->name); + astman_send_error(s, m, buf); + ast_channel_unlock(chan); + return 1; + } + astman_send_ack(s, m, "Added AGI command to queue"); + ast_channel_unlock(chan); + return 0; +} + +static int agi_handle_command(struct ast_channel *chan, AGI *agi, char *buf, int dead); +static void setup_env(struct ast_channel *chan, char *request, int fd, int enhanced, int argc, char *argv[]); +static enum agi_result launch_asyncagi(struct ast_channel *chan, char *argv[], int *efd) +{ +/* This buffer sizes might cause truncation if the AGI command writes more data + than AGI_BUF_SIZE as result. But let's be serious, is there an AGI command + that writes a response larger than 1024 bytes?, I don't think so, most of + them are just result=blah stuff. However probably if GET VARIABLE is called + and the variable has large amount of data, that could be a problem. We could + make this buffers dynamic, but let's leave that as a second step. + + AMI_BUF_SIZE is twice AGI_BUF_SIZE just for the sake of choosing a safe + number. Some characters of AGI buf will be url encoded to be sent to manager + clients. An URL encoded character will take 3 bytes, but again, to cause + truncation more than about 70% of the AGI buffer should be URL encoded for + that to happen. Not likely at all. + + On the other hand. I wonder if read() could eventually return less data than + the amount already available in the pipe? If so, how to deal with that? + So far, my tests on Linux have not had any problems. + */ +#define AGI_BUF_SIZE 1024 +#define AMI_BUF_SIZE 2048 + struct ast_frame *f; + struct agi_cmd *cmd; + int res, fds[2]; + int timeout = 100; + char agi_buffer[AGI_BUF_SIZE + 1]; + char ami_buffer[AMI_BUF_SIZE]; + enum agi_result returnstatus = AGI_RESULT_SUCCESS_ASYNC; + AGI async_agi; + + if (efd) { + ast_log(LOG_WARNING, "Async AGI does not support Enhanced AGI yet\n"); + return AGI_RESULT_FAILURE; + } + + /* add AsyncAGI datastore to the channel */ + if (add_to_agi(chan)) { + ast_log(LOG_ERROR, "failed to start Async AGI on channel %s\n", chan->name); + return AGI_RESULT_FAILURE; + } + + /* this pipe allows us to create a "fake" AGI struct to use + the AGI commands */ + res = pipe(fds); + if (res) { + ast_log(LOG_ERROR, "failed to create Async AGI pipe\n"); + /* intentionally do not remove datastore, added with + add_to_agi(), from channel. It will be removed when + the channel is hung up anyways */ + return AGI_RESULT_FAILURE; + } + /* handlers will get the pipe write fd and we read the AGI responses + from the pipe read fd */ + async_agi.fd = fds[1]; + async_agi.ctrl = fds[1]; + async_agi.audio = -1; /* no audio support */ + async_agi.fast = 0; + + /* notify possible manager users of a new channel ready to + receive commands */ + setup_env(chan, "async", fds[1], 0, 0, NULL); + /* read the environment */ + res = read(fds[0], agi_buffer, AGI_BUF_SIZE); + if (!res) { + ast_log(LOG_ERROR, "failed to read from Async AGI pipe on channel %s\n", chan->name); + returnstatus = AGI_RESULT_FAILURE; + goto quit; + } + agi_buffer[res] = '\0'; + /* encode it and send it thru the manager so whoever is going to take + care of AGI commands on this channel can decide which AGI commands + to execute based on the setup info */ + ast_uri_encode(agi_buffer, ami_buffer, AMI_BUF_SIZE, 1); + manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: Start\r\nChannel: %s\r\nEnv: %s\r\n", chan->name, ami_buffer); + while (1) { + /* bail out if we need to hangup */ + if (ast_check_hangup(chan)) { + ast_log(LOG_DEBUG, "ast_check_hangup returned true on chan %s\n", chan->name); + break; + } + /* retrieve a command + (commands are added via the manager or the cli threads) */ + cmd = get_agi_cmd(chan); + if (cmd) { + /* OK, we have a command, let's call the + command handler. */ + res = agi_handle_command(chan, &async_agi, cmd->cmd_buffer, 0); + if ((res < 0) || (res == AST_PBX_KEEPALIVE)) { + free_agi_cmd(cmd); + break; + } + /* the command handler must have written to our fake + AGI struct fd (the pipe), let's read the response */ + res = read(fds[0], agi_buffer, AGI_BUF_SIZE); + if (!res) { + returnstatus = AGI_RESULT_FAILURE; + ast_log(LOG_ERROR, "failed to read from AsyncAGI pipe on channel %s\n", chan->name); + free_agi_cmd(cmd); + break; + } + /* we have a response, let's send the response thru the + manager. Include the CommandID if it was specified + when the command was added */ + agi_buffer[res] = '\0'; + ast_uri_encode(agi_buffer, ami_buffer, AMI_BUF_SIZE, 1); + if (ast_strlen_zero(cmd->cmd_id)) + manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: Exec\r\nChannel: %s\r\nResult: %s\r\n", chan->name, ami_buffer); + else + manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: Exec\r\nChannel: %s\r\nCommandID: %s\r\nResult: %s\r\n", chan->name, cmd->cmd_id, ami_buffer); + free_agi_cmd(cmd); + } else { + /* no command so far, wait a bit for a frame to read */ + res = ast_waitfor(chan, timeout); + if (res < 0) { + ast_log(LOG_DEBUG, "ast_waitfor returned <= 0 on chan %s\n", chan->name); + break; + } + if (res == 0) + continue; + f = ast_read(chan); + if (!f) { + ast_log(LOG_DEBUG, "No frame read on channel %s, going out ...\n", chan->name); + returnstatus = AGI_RESULT_HANGUP; + break; + } + /* is there any other frame we should care about + besides AST_CONTROL_HANGUP? */ + if (f->frametype == AST_FRAME_CONTROL && f->subclass == AST_CONTROL_HANGUP) { + ast_log(LOG_DEBUG, "Got HANGUP frame on channel %s, going out ...\n", chan->name); + ast_frfree(f); + break; + } + ast_frfree(f); + } + } +quit: + /* notify manager users this channel cannot be + controlled anymore by Async AGI */ + manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: End\r\nChannel: %s\r\n", chan->name); + + /* close the pipe */ + close(fds[0]); + close(fds[1]); + + /* intentionally don't get rid of the datastore. So commands can be + still in the queue in case AsyncAGI gets called again. + Datastore destructor will be called on channel destroy anyway */ + + return returnstatus; + +#undef AGI_BUF_SIZE +#undef AMI_BUF_SIZE +} + /* launch_netscript: The fastagi handler. FastAGI defaults to port 4573 */ static enum agi_result launch_netscript(char *agiurl, char *argv[], int *fds, int *efd, int *opid) @@ -230,7 +599,7 @@ static enum agi_result launch_netscript(char *agiurl, char *argv[], int *fds, in return AGI_RESULT_SUCCESS_FAST; } -static enum agi_result launch_script(char *script, char *argv[], int *fds, int *efd, int *opid) +static enum agi_result launch_script(struct ast_channel *chan, char *script, char *argv[], int *fds, int *efd, int *opid) { char tmp[256]; int pid, toast[2], fromast[2], audio[2], x, res; @@ -239,6 +608,8 @@ static enum agi_result launch_script(char *script, char *argv[], int *fds, int * if (!strncasecmp(script, "agi://", 6)) return launch_netscript(script, argv, fds, efd, opid); + if (!strncasecmp(script, "agi:async", sizeof("agi:async")-1)) + return launch_asyncagi(chan, argv, efd); if (script[0] != '/') { snprintf(tmp, sizeof(tmp), "%s/%s", ast_config_AST_AGI_DIR, script); @@ -1628,6 +1999,12 @@ static int handle_speechrecognize(struct ast_channel *chan, AGI *agi, int argc, return RESULT_SUCCESS; } +static int handle_asyncagi_break(struct ast_channel *chan, AGI *agi, int argc, char *argv[]) +{ + ast_agi_fdprintf(chan, agi->fd, "200 result=0\n"); + return AST_PBX_KEEPALIVE; +} + static char usage_setmusic[] = " Usage: SET MUSIC ON \n" " Enables/Disables the music on hold generator. If is\n" @@ -1871,6 +2248,10 @@ static char usage_autohangup[] = " future. Of course it can be hungup before then as well. Setting to 0 will\n" " cause the autohangup feature to be disabled on this channel.\n"; +static char usage_break_aagi[] = +" Usage: ASYNCAGI BREAK\n" +" Break the Async AGI loop.\n"; + static char usage_noop[] = " Usage: NoOp\n" " Does nothing.\n"; @@ -1956,6 +2337,7 @@ static struct agi_command commands[] = { { { "speech", "activate", "grammar", NULL }, handle_speechactivategrammar, "Activates a grammar", usage_speechactivategrammar, 0 }, { { "speech", "deactivate", "grammar", NULL }, handle_speechdeactivategrammar, "Deactivates a grammar", usage_speechdeactivategrammar, 0 }, { { "speech", "recognize", NULL }, handle_speechrecognize, "Recognizes speech", usage_speechrecognize, 0 }, + { { "asyncagi", "break", NULL }, handle_asyncagi_break, "Break AsyncAGI loop", usage_break_aagi, 0 }, }; static AST_RWLIST_HEAD_STATIC(agi_commands, agi_command); @@ -2500,7 +2882,9 @@ static int agi_exec_full(struct ast_channel *chan, void *data, int enhanced, int } } #endif - res = launch_script(args.argv[0], args.argv, fds, enhanced ? &efd : NULL, &pid); + res = launch_script(chan, args.argv[0], args.argv, fds, enhanced ? &efd : NULL, &pid); + /* Async AGI do not require run_agi(), so just proceed if normal AGI + or Fast AGI are setup with success. */ if (res == AGI_RESULT_SUCCESS || res == AGI_RESULT_SUCCESS_FAST) { int status = 0; agi.fd = fds[1]; @@ -2516,12 +2900,13 @@ static int agi_exec_full(struct ast_channel *chan, void *data, int enhanced, int if (efd > -1) close(efd); ast_unreplace_sigchld(); - } + } ast_module_user_remove(u); switch (res) { case AGI_RESULT_SUCCESS: case AGI_RESULT_SUCCESS_FAST: + case AGI_RESULT_SUCCESS_ASYNC: pbx_builtin_setvar_helper(chan, "AGISTATUS", "SUCCESS"); break; case AGI_RESULT_FAILURE: @@ -2575,6 +2960,7 @@ static int deadagi_exec(struct ast_channel *chan, void *data) } static struct ast_cli_entry cli_agi[] = { + AST_CLI_DEFINE(handle_cli_agi_add_cmd, "Add AGI command to a channel in Async AGI"), AST_CLI_DEFINE(handle_cli_agi_debug, "Enable/Disable AGI debugging"), AST_CLI_DEFINE(handle_cli_agi_show, "List AGI commands or specific help"), AST_CLI_DEFINE(handle_cli_agi_dumphtml, "Dumps a list of AGI commands in HTML format") @@ -2586,6 +2972,7 @@ static int unload_module(void) ast_agi_unregister_multiple(ast_module_info->self, commands, sizeof(commands) / sizeof(struct agi_command)); ast_unregister_application(eapp); ast_unregister_application(deadapp); + ast_manager_unregister("AGI"); return ast_unregister_application(app); } @@ -2595,6 +2982,7 @@ static int load_module(void) ast_agi_register_multiple(ast_module_info->self, commands, sizeof(commands) / sizeof(struct agi_command)); ast_register_application(deadapp, deadagi_exec, deadsynopsis, descrip); ast_register_application(eapp, eagi_exec, esynopsis, descrip); + ast_manager_register2("AGI", EVENT_FLAG_CALL, action_add_agi_cmd, "Add an AGI command to execute by Async AGI", mandescr_asyncagi); return ast_register_application(app, agi_exec, synopsis, descrip); } -- cgit v1.2.3