diff options
Diffstat (limited to 'tests/pjsua/run.py')
-rw-r--r-- | tests/pjsua/run.py | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/tests/pjsua/run.py b/tests/pjsua/run.py new file mode 100644 index 0000000..799b659 --- /dev/null +++ b/tests/pjsua/run.py @@ -0,0 +1,289 @@ +# $Id: run.py 4183 2012-06-28 09:16:03Z nanang $ +import sys +import imp +import re +import os +import subprocess +import random +import time +import getopt + +import inc_const as const +import inc_cfg as inc + +# Vars +G_EXE = "" # pjsua executable path +G_INUNIX = False # flags that test is running in Unix + + +# Usage string +usage = \ +""" +run.py - Automated test driver + +Usage: + run.py [options] MODULE CONFIG +Options: + --exe, -e pjsua executable path + --null-audio, -n use null audio +Sample: + run.py -n mod_run.py scripts-run/100_simple.py +""" + +# Parse arguments +try: + opts, args = getopt.getopt(sys.argv[1:], "hne:", ["help", "null-audio", "exe="]) +except getopt.GetoptError, err: + print str(err) + print usage + sys.exit(2) +for o, a in opts: + if o in ("-h", "--help"): + print usage + sys.exit() + elif o in ("-n", "--null-audio"): + inc.HAS_SND_DEV = 0 + elif o in ("-e", "--exe"): + G_EXE = a + else: + print "Unknown options" + sys.exit(2) + +if len(args) != 2: + print "Invalid arguments" + print usage + sys.exit(2) + +# Set global ARGS to be used by modules +inc.ARGS = args + +# Get the pjsua executable name +if G_EXE == "": + if sys.platform.find("win32")!=-1: + EXE_DIR = "../../pjsip-apps/bin/" + EXECUTABLES = [ "pjsua_vc6d.exe", + "pjsua_vc6.exe", + "pjsua-i386-Win32-vc8-Debug.exe", + "pjsua-i386-Win32-vc8-Debug-Dynamic.exe", + "pjsua-i386-Win32-vc8-Debug-Static.exe", + "pjsua-i386-Win32-vc8-Release.exe", + "pjsua-i386-Win32-vc8-Release-Dynamic.exe", + "pjsua-i386-Win32-vc8-Release-Static.exe" + ] + e_ts = 0 + for e in EXECUTABLES: + e = EXE_DIR + e + if os.access(e, os.F_OK): + st = os.stat(e) + if e_ts==0 or e_ts<st.st_mtime: + G_EXE = e + e_ts = st.st_mtime + + if G_EXE=="": + print "Unable to find valid pjsua. Please build pjsip first" + sys.exit(1) + + G_INUNIX = False + else: + f = open("../../build.mak", "r") + while True: + line = f.readline() + if not line: + break + if line.find("TARGET_NAME")!=-1: + print line + G_EXE="../../pjsip-apps/bin/pjsua-" + line.split(":= ")[1] + break + if G_EXE=="": + print "Unable to find ../../../build.mak. Please build pjsip first" + sys.exit(1) + G_INUNIX = True +else: + if sys.platform.lower().find("win32")!=-1 or sys.platform.lower().find("microsoft")!=-1: + G_INUNIX = False + else: + G_INUNIX = True + + +G_EXE = G_EXE.rstrip("\n\r \t") + +################################### +# Poor man's 'expect'-like class +class Expect: + proc = None + echo = False + trace_enabled = False + name = "" + inst_param = None + rh = re.compile(const.DESTROYED) + ra = re.compile(const.ASSERT, re.I) + rr = re.compile(const.STDOUT_REFRESH) + t0 = time.time() + def __init__(self, inst_param): + self.inst_param = inst_param + self.name = inst_param.name + self.echo = inst_param.echo_enabled + self.trace_enabled = inst_param.trace_enabled + fullcmd = G_EXE + " " + inst_param.arg + " --stdout-refresh=5 --stdout-refresh-text=" + const.STDOUT_REFRESH + if not inst_param.enable_buffer: + fullcmd = fullcmd + " --stdout-no-buf" + self.trace("Popen " + fullcmd) + self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False) + def send(self, cmd): + self.trace("send " + cmd) + self.proc.stdin.writelines(cmd + "\n") + self.proc.stdin.flush() + def expect(self, pattern, raise_on_error=True, title=""): + self.trace("expect " + pattern) + r = re.compile(pattern, re.I) + refresh_cnt = 0 + while True: + line = self.proc.stdout.readline() + if line == "": + raise inc.TestError(self.name + ": Premature EOF") + # Print the line if echo is ON + if self.echo: + print self.name + ": " + line.rstrip() + # Trap assertion error + if self.ra.search(line) != None: + if raise_on_error: + raise inc.TestError(self.name + ": " + line) + else: + return None + # Count stdout refresh text. + if self.rr.search(line) != None: + refresh_cnt = refresh_cnt+1 + if refresh_cnt >= 6: + self.trace("Timed-out!") + if raise_on_error: + raise inc.TestError(self.name + " " + title + ": Timeout expecting pattern: \"" + pattern + "\"") + else: + return None # timeout + # Search for expected text + if r.search(line) != None: + return line + + def sync_stdout(self): + self.trace("sync_stdout") + cmd = "echo 1" + str(random.randint(1000,9999)) + self.send(cmd) + self.expect(cmd) + + def wait(self): + self.trace("wait") + self.proc.communicate() + + def trace(self, s): + if self.trace_enabled: + now = time.time() + fmt = self.name + ": " + "================== " + s + " ==================" + " [at t=%(time)03d]" + print fmt % {'time':int(now - self.t0)} + +######################### +# Error handling +def handle_error(errmsg, t, close_processes = True): + print "====== Caught error: " + errmsg + " ======" + if (close_processes): + time.sleep(1) + for p in t.process: + # Protect against 'Broken pipe' exception + try: + p.send("q") + p.send("q") + except: + pass + is_err = False + try: + ret = p.expect(const.DESTROYED, False) + if not ret: + is_err = True + except: + is_err = True + if is_err: + if sys.hexversion >= 0x02060000: + p.proc.terminate() + else: + p.wait() + else: + p.wait() + print "Test completed with error: " + errmsg + sys.exit(1) + + +######################### +# MAIN + +# Import the test script +script = imp.load_source("script", inc.ARGS[0]) + +# Init random seed +random.seed() + +# Validate +if script.test == None: + print "Error: no test defined" + sys.exit(1) + +if script.test.skip: + print "Test " + script.test.title + " is skipped" + sys.exit(0) + +if len(script.test.inst_params) == 0: + print "Error: test doesn't contain pjsua run descriptions" + sys.exit(1) + +# Instantiate pjsuas +print "====== Running " + script.test.title + " ======" +print "Using " + G_EXE + " as pjsua executable" + +for inst_param in script.test.inst_params: + try: + # Create pjsua's Expect instance from the param + p = Expect(inst_param) + # Wait until registration completes + if inst_param.have_reg: + p.expect(inst_param.uri+".*registration success") + # Synchronize stdout + p.send("") + p.expect(const.PROMPT) + p.send("echo 1") + p.send("echo 1") + p.expect("echo 1") + # add running instance + script.test.process.append(p) + + except inc.TestError, e: + handle_error(e.desc, script.test) + +# Run the test function +if script.test.test_func != None: + try: + script.test.test_func(script.test) + except inc.TestError, e: + handle_error(e.desc, script.test) + +# Shutdown all instances +time.sleep(2) +for p in script.test.process: + # Unregister if we have_reg to make sure that next tests + # won't wail + if p.inst_param.have_reg: + p.send("ru") + p.expect(p.inst_param.uri+".*unregistration success") + p.send("q") + p.send("q") + time.sleep(0.5) + p.expect(const.DESTROYED, False) + p.wait() + +# Run the post test function +if script.test.post_func != None: + try: + script.test.post_func(script.test) + except inc.TestError, e: + handle_error(e.desc, script.test, False) + +# Done +print "Test " + script.test.title + " completed successfully" +sys.exit(0) + |