123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- #!/usr/bin/python
- #
- # rt-mutex tester
- #
- # (C) 2006 Thomas Gleixner <tglx@linutronix.de>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License version 2 as
- # published by the Free Software Foundation.
- #
- import os
- import sys
- import getopt
- import shutil
- import string
- # Globals
- quiet = 0
- test = 0
- comments = 0
- sysfsprefix = "/sys/devices/system/rttest/rttest"
- statusfile = "/status"
- commandfile = "/command"
- # Command opcodes
- cmd_opcodes = {
- "schedother" : "1",
- "schedfifo" : "2",
- "lock" : "3",
- "locknowait" : "4",
- "lockint" : "5",
- "lockintnowait" : "6",
- "lockcont" : "7",
- "unlock" : "8",
- "signal" : "11",
- "resetevent" : "98",
- "reset" : "99",
- }
- test_opcodes = {
- "prioeq" : ["P" , "eq" , None],
- "priolt" : ["P" , "lt" , None],
- "priogt" : ["P" , "gt" , None],
- "nprioeq" : ["N" , "eq" , None],
- "npriolt" : ["N" , "lt" , None],
- "npriogt" : ["N" , "gt" , None],
- "unlocked" : ["M" , "eq" , 0],
- "trylock" : ["M" , "eq" , 1],
- "blocked" : ["M" , "eq" , 2],
- "blockedwake" : ["M" , "eq" , 3],
- "locked" : ["M" , "eq" , 4],
- "opcodeeq" : ["O" , "eq" , None],
- "opcodelt" : ["O" , "lt" , None],
- "opcodegt" : ["O" , "gt" , None],
- "eventeq" : ["E" , "eq" , None],
- "eventlt" : ["E" , "lt" , None],
- "eventgt" : ["E" , "gt" , None],
- }
- # Print usage information
- def usage():
- print "rt-tester.py <-c -h -q -t> <testfile>"
- print " -c display comments after first command"
- print " -h help"
- print " -q quiet mode"
- print " -t test mode (syntax check)"
- print " testfile: read test specification from testfile"
- print " otherwise from stdin"
- return
- # Print progress when not in quiet mode
- def progress(str):
- if not quiet:
- print str
- # Analyse a status value
- def analyse(val, top, arg):
- intval = int(val)
- if top[0] == "M":
- intval = intval / (10 ** int(arg))
- intval = intval % 10
- argval = top[2]
- elif top[0] == "O":
- argval = int(cmd_opcodes.get(arg, arg))
- else:
- argval = int(arg)
- # progress("%d %s %d" %(intval, top[1], argval))
- if top[1] == "eq" and intval == argval:
- return 1
- if top[1] == "lt" and intval < argval:
- return 1
- if top[1] == "gt" and intval > argval:
- return 1
- return 0
- # Parse the commandline
- try:
- (options, arguments) = getopt.getopt(sys.argv[1:],'chqt')
- except getopt.GetoptError, ex:
- usage()
- sys.exit(1)
- # Parse commandline options
- for option, value in options:
- if option == "-c":
- comments = 1
- elif option == "-q":
- quiet = 1
- elif option == "-t":
- test = 1
- elif option == '-h':
- usage()
- sys.exit(0)
- # Select the input source
- if arguments:
- try:
- fd = open(arguments[0])
- except Exception,ex:
- sys.stderr.write("File not found %s\n" %(arguments[0]))
- sys.exit(1)
- else:
- fd = sys.stdin
- linenr = 0
- # Read the test patterns
- while 1:
- linenr = linenr + 1
- line = fd.readline()
- if not len(line):
- break
- line = line.strip()
- parts = line.split(":")
- if not parts or len(parts) < 1:
- continue
- if len(parts[0]) == 0:
- continue
- if parts[0].startswith("#"):
- if comments > 1:
- progress(line)
- continue
- if comments == 1:
- comments = 2
- progress(line)
- cmd = parts[0].strip().lower()
- opc = parts[1].strip().lower()
- tid = parts[2].strip()
- dat = parts[3].strip()
- try:
- # Test or wait for a status value
- if cmd == "t" or cmd == "w":
- testop = test_opcodes[opc]
- fname = "%s%s%s" %(sysfsprefix, tid, statusfile)
- if test:
- print fname
- continue
- while 1:
- query = 1
- fsta = open(fname, 'r')
- status = fsta.readline().strip()
- fsta.close()
- stat = status.split(",")
- for s in stat:
- s = s.strip()
- if s.startswith(testop[0]):
- # Separate status value
- val = s[2:].strip()
- query = analyse(val, testop, dat)
- break
- if query or cmd == "t":
- break
- progress(" " + status)
- if not query:
- sys.stderr.write("Test failed in line %d\n" %(linenr))
- sys.exit(1)
- # Issue a command to the tester
- elif cmd == "c":
- cmdnr = cmd_opcodes[opc]
- # Build command string and sys filename
- cmdstr = "%s:%s" %(cmdnr, dat)
- fname = "%s%s%s" %(sysfsprefix, tid, commandfile)
- if test:
- print fname
- continue
- fcmd = open(fname, 'w')
- fcmd.write(cmdstr)
- fcmd.close()
- except Exception,ex:
- sys.stderr.write(str(ex))
- sys.stderr.write("\nSyntax error in line %d\n" %(linenr))
- if not test:
- fd.close()
- sys.exit(1)
- # Normal exit pass
- print "Pass"
- sys.exit(0)
|