From 4559cd0e284ee3cb672d9c7c935ab99b3bb16489 Mon Sep 17 00:00:00 2001 From: Nir Simionovich Date: Mon, 16 Oct 2017 17:46:02 -0400 Subject: This patch adds a beanstalk CDR backend. Beanstalkd is a simple to use job queue. It provides a means to create multiple job queues called "tubes". Each tube can store multiple jobs, with varying priorities with the queue. Queue processing is available via a simple TCP socket or via well defined libraries, avaialble at https://github.com/kr/beanstalkd/wiki/client-libraries This module is based upon the beanstalk-client library, available for download at: https://github.com/deepfryed/beanstalk-client Change-Id: I5fe4089a34ab3b39230786d9bbfddafa56715f48 --- build_tools/menuselect-deps.in | 1 + cdr/cdr_beanstalkd.c | 270 +++++++++++++++++++++++++++++ configs/samples/cdr_beanstalkd.conf.sample | 26 +++ configure | 143 +++++++++++++++ configure.ac | 3 + include/asterisk/autoconfig.h.in | 3 + makeopts.in | 3 + 7 files changed, 449 insertions(+) create mode 100644 cdr/cdr_beanstalkd.c create mode 100644 configs/samples/cdr_beanstalkd.conf.sample diff --git a/build_tools/menuselect-deps.in b/build_tools/menuselect-deps.in index ec70be0da..9629ea536 100644 --- a/build_tools/menuselect-deps.in +++ b/build_tools/menuselect-deps.in @@ -1,5 +1,6 @@ ALSA=@PBX_ALSA@ BLUETOOTH=@PBX_BLUETOOTH@ +BEANSTALK=@PBX_BEANSTALK@ COROSYNC=@PBX_COROSYNC@ CRYPTO=@PBX_CRYPTO@ BFD=@PBX_BFD@ diff --git a/cdr/cdr_beanstalkd.c b/cdr/cdr_beanstalkd.c new file mode 100644 index 000000000..4812c91aa --- /dev/null +++ b/cdr/cdr_beanstalkd.c @@ -0,0 +1,270 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2017 + * + * Nir Simionovich + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \brief Asterisk Beanstalkd CDR records. + * + * This module requires the beanstalk-client library, avaialble from + * https://github.com/deepfryed/beanstalk-client + * + * See also + * \arg \ref AstCDR + * \ingroup cdr_drivers + */ + +/*! \li \ref cdr_beanstalkd.c uses the configuration file \ref cdr_beanstalkd.conf + * \addtogroup configuration_file Configuration Files + */ + +/*! + * \page cdr_beanstalkd.conf cdr_beanstalkd.conf + * \verbinclude cdr_beanstalkd.conf.sample + */ + +/*** MODULEINFO + beanstalk + extended + ***/ + +#include "asterisk.h" + +#include +#include + +#include "beanstalk.h" +#include "asterisk/channel.h" +#include "asterisk/cdr.h" +#include "asterisk/module.h" +#include "asterisk/utils.h" +#include "asterisk/manager.h" +#include "asterisk/config.h" +#include "asterisk/pbx.h" +#include "asterisk/json.h" + +#define DATE_FORMAT "%Y-%m-%d %T" +#define CONF_FILE "cdr_beanstalkd.conf" +#define BEANSTALK_JOB_SIZE 4096 +#define BEANSTALK_JOB_PRIORITY 99 +#define BEANSTALK_JOB_TTR 60 +#define BEANSTALK_JOB_DELAY 0 +#define DEFAULT_BEANSTALK_HOST "127.0.0.1" +#define DEFAULT_BEANSTALK_PORT 11300 +#define DEFAULT_BEANSTALK_TUBE "asterisk-cdr" + +static const char name[] = "cdr_beanstalkd"; + +static int enablecdr = 0; +static char *bs_host; +static int bs_port; +static char *bs_tube; +static int priority; + +AST_RWLOCK_DEFINE_STATIC(config_lock); + +static int beanstalk_put(struct ast_cdr *cdr); + +static int load_config(int reload) { + char *cat = NULL; + struct ast_config *cfg; + struct ast_variable *v; + struct ast_flags config_flags = {reload ? CONFIG_FLAG_FILEUNCHANGED : 0}; + int newenablecdr = 0; + + cfg = ast_config_load(CONF_FILE, config_flags); + if (cfg == CONFIG_STATUS_FILEUNCHANGED) { + return 0; + } + + if (cfg == CONFIG_STATUS_FILEINVALID) { + ast_log(LOG_ERROR, "Config file '%s' could not be parsed\n", CONF_FILE); + return -1; + } + + if (!cfg) { + /* Standard configuration */ + ast_log(LOG_WARNING, "Failed to load configuration file. Module not activated.\n"); + if (enablecdr) { + ast_cdr_backend_suspend(name); + } + enablecdr = 0; + return -1; + } + + if (reload) { + ast_rwlock_wrlock(&config_lock); + ast_free(bs_host); + ast_free(bs_tube); + } + + /* Bootstrap the default configuration */ + bs_host = ast_strdup(DEFAULT_BEANSTALK_HOST); + bs_port = DEFAULT_BEANSTALK_PORT; + bs_tube = ast_strdup(DEFAULT_BEANSTALK_TUBE); + priority = BEANSTALK_JOB_PRIORITY; + + while ((cat = ast_category_browse(cfg, cat))) { + if (!strcasecmp(cat, "general")) { + v = ast_variable_browse(cfg, cat); + while (v) { + + if (!strcasecmp(v->name, "enabled")) { + newenablecdr = ast_true(v->value); + } else if (!strcasecmp(v->name, "host")) { + ast_free(bs_host); + bs_host = ast_strdup(v->value); + } else if (!strcasecmp(v->name, "port")) { + bs_port = atoi(v->value); + } else if (!strcasecmp(v->name, "tube")) { + ast_free(bs_tube); + bs_tube = ast_strdup(v->value); + } else if (!strcasecmp(v->name, "priority")) { + priority = atoi(v->value); + } + v = v->next; + + } + } + } + + if (reload) { + ast_rwlock_unlock(&config_lock); + } + + ast_config_destroy(cfg); + + if (!newenablecdr) { + ast_cdr_backend_suspend(name); + } else if (newenablecdr) { + ast_cdr_backend_unsuspend(name); + ast_log(LOG_NOTICE, "Added beanstalkd server %s at port %d with tube %s", bs_host, bs_port, bs_tube); + } + enablecdr = newenablecdr; + + return 0; +} + +static int beanstalk_put(struct ast_cdr *cdr) { + struct ast_tm timeresult; + char strAnswerTime[80] = ""; + char strStartTime[80]; + char strEndTime[80]; + char *cdr_buffer; + int bs_id; + int bs_socket; + struct ast_json *t_cdr_json; + + if (!enablecdr) { + return 0; + } + + ast_rwlock_rdlock(&config_lock); + bs_socket = bs_connect(bs_host, bs_port); + + if (bs_use(bs_socket, bs_tube) != BS_STATUS_OK) { + ast_log(LOG_ERROR, "Connection to Beanstalk tube %s @ %s:%d had failed", bs_tube, bs_host, bs_port); + ast_rwlock_unlock(&config_lock); + return 0; + } + + ast_localtime(&cdr->start, &timeresult, NULL); + ast_strftime(strStartTime, sizeof(strStartTime), DATE_FORMAT, &timeresult); + + if (cdr->answer.tv_sec) { + ast_localtime(&cdr->answer, &timeresult, NULL); + ast_strftime(strAnswerTime, sizeof(strAnswerTime), DATE_FORMAT, &timeresult); + } + + ast_localtime(&cdr->end, &timeresult, NULL); + ast_strftime(strEndTime, sizeof(strEndTime), DATE_FORMAT, &timeresult); + + ast_rwlock_unlock(&config_lock); + + t_cdr_json = ast_json_pack("{s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:i, s:i, s:s, s:s, s:s, s:s}", + "AccountCode", S_OR(cdr->accountcode, ""), + "Source", S_OR(cdr->src, ""), + "Destination", S_OR(cdr->dst, ""), + "DestinationContext", S_OR(cdr->dcontext, ""), + "CallerID", S_OR(cdr->clid, ""), + "Channel", S_OR(cdr->channel, ""), + "DestinationChannel", S_OR(cdr->dstchannel, ""), + "LastApplication", S_OR(cdr->lastapp, ""), + "LastData", S_OR(cdr->lastdata, ""), + "StartTime", S_OR(strStartTime, ""), + "AnswerTime", S_OR(strAnswerTime, ""), + "EndTime", S_OR(strEndTime, ""), + "Duration", cdr->duration, + "Billsec", cdr->billsec, + "Disposition", S_OR(ast_cdr_disp2str(cdr->disposition), ""), + "AMAFlags", S_OR(ast_channel_amaflags2string(cdr->amaflags), ""), + "UniqueID", S_OR(cdr->uniqueid, ""), + "UserField", S_OR(cdr->userfield, "")); + + cdr_buffer = ast_json_dump_string(t_cdr_json); + + ast_json_unref(t_cdr_json); + + bs_id = bs_put(bs_socket, priority, BEANSTALK_JOB_DELAY, BEANSTALK_JOB_TTR, cdr_buffer, strlen(cdr_buffer)); + + if (bs_id > 0) { + ast_log(LOG_DEBUG, "Successfully created job %d with %s\n", bs_id, cdr_buffer); + } else { + ast_log(LOG_ERROR, "CDR job creation failed for %s\n", cdr_buffer); + } + + bs_disconnect(bs_socket); + ast_json_free(cdr_buffer); + return 0; +} + +static int unload_module(void) { + if (ast_cdr_unregister(name)) { + return -1; + } + + ast_free(bs_host); + ast_free(bs_tube); + + return 0; +} + +static int load_module(void) { + if (ast_cdr_register(name, "Asterisk CDR Beanstalkd Backend", beanstalk_put)) { + return AST_MODULE_LOAD_DECLINE; + } + + if (load_config(0)) { + ast_cdr_unregister(name); + return AST_MODULE_LOAD_DECLINE; + } + + return AST_MODULE_LOAD_SUCCESS; +} + +static int reload(void) { + return load_config(1); +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Asterisk Beanstalkd CDR Backend", +.support_level = AST_MODULE_SUPPORT_EXTENDED, +.load = load_module, +.unload = unload_module, +.reload = reload, +.load_pri = AST_MODPRI_CDR_DRIVER, +); + diff --git a/configs/samples/cdr_beanstalkd.conf.sample b/configs/samples/cdr_beanstalkd.conf.sample new file mode 100644 index 000000000..5f45d5d5d --- /dev/null +++ b/configs/samples/cdr_beanstalkd.conf.sample @@ -0,0 +1,26 @@ +; +; Asterisk Call Management CDR via Beanstalkd job queue +; +; Beanstalkd is a simple job queue server, that is highly versatile and simple to use. +; Beanstalkd includes the capability of using multiple queues at the same time, with priorities. +; +; This module requires that your server has the beanstalk-client library installed. The library +; can be downloaded from - https://github.com/deepfryed/beanstalk-client +; + +[general] +;enabled = yes + +;host = 127.0.0.1 ; Specify the remote IP address of the Beanstalkd server +;port = 11300 ; Specify the remote PORT of the the Beanstalkd server +;tube = asterisk-cdr ; Specify the default CDR job queue to use +;priority = 99 ; Specify the default job priority for the queue. This parameter is useful when building + ; platform with multiple Asterisk servers, that are used for different functions. For example, + ; none billable CDR records can be inserted with a priority of 99, while billable ones be + ; inserted with a priority of 1 + + + + + + diff --git a/configure b/configure index 893f10738..bc4b03c15 100755 --- a/configure +++ b/configure @@ -991,6 +991,10 @@ PBX_PJSIP_DLG_CREATE_UAS_AND_INC_LOCK PJSIP_DLG_CREATE_UAS_AND_INC_LOCK_DIR PJSIP_DLG_CREATE_UAS_AND_INC_LOCK_INCLUDE PJSIP_DLG_CREATE_UAS_AND_INC_LOCK_LIB +PBX_BEANSTALK +BEANSTALK_DIR +BEANSTALK_INCLUDE +BEANSTALK_LIB PBX_PGSQL PGSQL_DIR PGSQL_INCLUDE @@ -1430,6 +1434,7 @@ with_opus with_osptk with_oss with_postgres +with_beanstalk with_pjproject with_popt with_portaudio @@ -2182,6 +2187,7 @@ Optional Packages: --with-osptk=PATH use OSP Toolkit files in PATH --with-oss=PATH use Open Sound System files in PATH --with-postgres=PATH use PostgreSQL files in PATH + --with-beanstalk=PATH use Beanstalk Job Queue files in PATH --with-pjproject=PATH use PJPROJECT files in PATH --with-popt=PATH use popt files in PATH --with-portaudio=PATH use PortAudio files in PATH @@ -11421,6 +11427,38 @@ fi + BEANSTALK_DESCRIP="Beanstalk Job Queue" + BEANSTALK_OPTION="beanstalk" + PBX_BEANSTALK=0 + +# Check whether --with-beanstalk was given. +if test "${with_beanstalk+set}" = set; then : + withval=$with_beanstalk; + case ${withval} in + n|no) + USE_BEANSTALK=no + # -1 is a magic value used by menuselect to know that the package + # was disabled, other than 'not found' + PBX_BEANSTALK=-1 + ;; + y|ye|yes) + ac_mandatory_list="${ac_mandatory_list} BEANSTALK" + ;; + *) + BEANSTALK_DIR="${withval}" + ac_mandatory_list="${ac_mandatory_list} BEANSTALK" + ;; + esac + +fi + + + + + + + + if test "x${PBX_PJPROJECT}" != "x1" ; then PJPROJECT_DESCRIP="PJPROJECT" @@ -25043,6 +25081,111 @@ fi + +if test "x${PBX_BEANSTALK}" != "x1" -a "${USE_BEANSTALK}" != "no"; then + pbxlibdir="" + # if --with-BEANSTALK=DIR has been specified, use it. + if test "x${BEANSTALK_DIR}" != "x"; then + if test -d ${BEANSTALK_DIR}/lib; then + pbxlibdir="-L${BEANSTALK_DIR}/lib" + else + pbxlibdir="-L${BEANSTALK_DIR}" + fi + fi + pbxfuncname="bs_version" + if test "x${pbxfuncname}" = "x" ; then # empty lib, assume only headers + AST_BEANSTALK_FOUND=yes + else + ast_ext_lib_check_save_CFLAGS="${CFLAGS}" + CFLAGS="${CFLAGS} " + as_ac_Lib=`$as_echo "ac_cv_lib_beanstalk_${pbxfuncname}" | $as_tr_sh` +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for ${pbxfuncname} in -lbeanstalk" >&5 +$as_echo_n "checking for ${pbxfuncname} in -lbeanstalk... " >&6; } +if eval \${$as_ac_Lib+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lbeanstalk ${pbxlibdir} $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ${pbxfuncname} (); +int +main () +{ +return ${pbxfuncname} (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + eval "$as_ac_Lib=yes" +else + eval "$as_ac_Lib=no" +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +eval ac_res=\$$as_ac_Lib + { $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_res" >&5 +$as_echo "$ac_res" >&6; } +if eval test \"x\$"$as_ac_Lib"\" = x"yes"; then : + AST_BEANSTALK_FOUND=yes +else + AST_BEANSTALK_FOUND=no +fi + + CFLAGS="${ast_ext_lib_check_save_CFLAGS}" + fi + + # now check for the header. + if test "${AST_BEANSTALK_FOUND}" = "yes"; then + BEANSTALK_LIB="${pbxlibdir} -lbeanstalk " + # if --with-BEANSTALK=DIR has been specified, use it. + if test "x${BEANSTALK_DIR}" != "x"; then + BEANSTALK_INCLUDE="-I${BEANSTALK_DIR}/include" + fi + BEANSTALK_INCLUDE="${BEANSTALK_INCLUDE} " + if test "xbeanstalk.h" = "x" ; then # no header, assume found + BEANSTALK_HEADER_FOUND="1" + else # check for the header + ast_ext_lib_check_saved_CPPFLAGS="${CPPFLAGS}" + CPPFLAGS="${CPPFLAGS} ${BEANSTALK_INCLUDE}" + ac_fn_c_check_header_mongrel "$LINENO" "beanstalk.h" "ac_cv_header_beanstalk_h" "$ac_includes_default" +if test "x$ac_cv_header_beanstalk_h" = xyes; then : + BEANSTALK_HEADER_FOUND=1 +else + BEANSTALK_HEADER_FOUND=0 +fi + + + CPPFLAGS="${ast_ext_lib_check_saved_CPPFLAGS}" + fi + if test "x${BEANSTALK_HEADER_FOUND}" = "x0" ; then + BEANSTALK_LIB="" + BEANSTALK_INCLUDE="" + else + if test "x${pbxfuncname}" = "x" ; then # only checking headers -> no library + BEANSTALK_LIB="" + fi + PBX_BEANSTALK=1 + cat >>confdefs.h <<_ACEOF +#define HAVE_BEANSTALK 1 +_ACEOF + + fi + fi +fi + + + # possible places for oss definitions if test "x${PBX_OSS}" != "x1" -a "${USE_OSS}" != "no"; then diff --git a/configure.ac b/configure.ac index e714c54b6..1ca64c85e 100644 --- a/configure.ac +++ b/configure.ac @@ -526,6 +526,7 @@ AST_EXT_LIB_SETUP([OPUS], [Opus], [opus]) AST_EXT_LIB_SETUP([OSPTK], [OSP Toolkit], [osptk]) AST_EXT_LIB_SETUP([OSS], [Open Sound System], [oss]) AST_EXT_LIB_SETUP([PGSQL], [PostgreSQL], [postgres]) +AST_EXT_LIB_SETUP([BEANSTALK], [Beanstalk Job Queue], [beanstalk]) if test "x${PBX_PJPROJECT}" != "x1" ; then AST_EXT_LIB_SETUP([PJPROJECT], [PJPROJECT], [pjproject]) @@ -2170,6 +2171,8 @@ AST_EXT_LIB_CHECK([BKTR], [c], [backtrace], [execinfo.h]) AST_EXT_LIB_CHECK([BLUETOOTH], [bluetooth], [ba2str], [bluetooth/bluetooth.h]) +AST_EXT_LIB_CHECK([BEANSTALK], [beanstalk], [bs_version], [beanstalk.h]) + # possible places for oss definitions AST_EXT_LIB_CHECK([OSS], [ossaudio], [], [linux/soundcard.h]) AST_EXT_LIB_CHECK([OSS], [ossaudio], [], [sys/soundcard.h]) diff --git a/include/asterisk/autoconfig.h.in b/include/asterisk/autoconfig.h.in index b9b4e1f81..c7a190892 100644 --- a/include/asterisk/autoconfig.h.in +++ b/include/asterisk/autoconfig.h.in @@ -115,6 +115,9 @@ attribute. */ #undef HAVE_ATTRIBUTE_warn_unused_result +/* Define to 1 if you have the Beanstalk Job Queue library. */ +#undef HAVE_BEANSTALK + /* Define to 1 if you have the Debug symbol decoding library. */ #undef HAVE_BFD diff --git a/makeopts.in b/makeopts.in index 6a1164c32..924f3e23a 100644 --- a/makeopts.in +++ b/makeopts.in @@ -383,3 +383,6 @@ TIMERFD_INCLUDE=@TIMERFD_INCLUDE@ SNDFILE_INCLUDE=@SNDFILE_INCLUDE@ SNDFILE_LIB=@SNDFILE_LIB@ + +BEANSTALK_INCLUDE=@BEANSTALK_INCLUDE@ +BEANSTALK_LIB=@BEANSTALK_LIB@ -- cgit v1.2.3