123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- """ basic collect and runtest protocol implementations """
- import bdb
- import sys
- from time import time
- import py
- import pytest
- from _pytest._code.code import TerminalRepr, ExceptionInfo
- def pytest_namespace():
- return {
- 'fail' : fail,
- 'skip' : skip,
- 'importorskip' : importorskip,
- 'exit' : exit,
- }
- #
- # pytest plugin hooks
- def pytest_addoption(parser):
- group = parser.getgroup("terminal reporting", "reporting", after="general")
- group.addoption('--durations',
- action="store", type=int, default=None, metavar="N",
- help="show N slowest setup/test durations (N=0 for all)."),
- def pytest_terminal_summary(terminalreporter):
- durations = terminalreporter.config.option.durations
- if durations is None:
- return
- tr = terminalreporter
- dlist = []
- for replist in tr.stats.values():
- for rep in replist:
- if hasattr(rep, 'duration'):
- dlist.append(rep)
- if not dlist:
- return
- dlist.sort(key=lambda x: x.duration)
- dlist.reverse()
- if not durations:
- tr.write_sep("=", "slowest test durations")
- else:
- tr.write_sep("=", "slowest %s test durations" % durations)
- dlist = dlist[:durations]
- for rep in dlist:
- nodeid = rep.nodeid.replace("::()::", "::")
- tr.write_line("%02.2fs %-8s %s" %
- (rep.duration, rep.when, nodeid))
- def pytest_sessionstart(session):
- session._setupstate = SetupState()
- def pytest_sessionfinish(session):
- session._setupstate.teardown_all()
- class NodeInfo:
- def __init__(self, location):
- self.location = location
- def pytest_runtest_protocol(item, nextitem):
- item.ihook.pytest_runtest_logstart(
- nodeid=item.nodeid, location=item.location,
- )
- runtestprotocol(item, nextitem=nextitem)
- return True
- def runtestprotocol(item, log=True, nextitem=None):
- hasrequest = hasattr(item, "_request")
- if hasrequest and not item._request:
- item._initrequest()
- rep = call_and_report(item, "setup", log)
- reports = [rep]
- if rep.passed:
- reports.append(call_and_report(item, "call", log))
- reports.append(call_and_report(item, "teardown", log,
- nextitem=nextitem))
- # after all teardown hooks have been called
- # want funcargs and request info to go away
- if hasrequest:
- item._request = False
- item.funcargs = None
- return reports
- def pytest_runtest_setup(item):
- item.session._setupstate.prepare(item)
- def pytest_runtest_call(item):
- try:
- item.runtest()
- except Exception:
- # Store trace info to allow postmortem debugging
- type, value, tb = sys.exc_info()
- tb = tb.tb_next # Skip *this* frame
- sys.last_type = type
- sys.last_value = value
- sys.last_traceback = tb
- del tb # Get rid of it in this namespace
- raise
- def pytest_runtest_teardown(item, nextitem):
- item.session._setupstate.teardown_exact(item, nextitem)
- def pytest_report_teststatus(report):
- if report.when in ("setup", "teardown"):
- if report.failed:
- # category, shortletter, verbose-word
- return "error", "E", "ERROR"
- elif report.skipped:
- return "skipped", "s", "SKIPPED"
- else:
- return "", "", ""
- #
- # Implementation
- def call_and_report(item, when, log=True, **kwds):
- call = call_runtest_hook(item, when, **kwds)
- hook = item.ihook
- report = hook.pytest_runtest_makereport(item=item, call=call)
- if log:
- hook.pytest_runtest_logreport(report=report)
- if check_interactive_exception(call, report):
- hook.pytest_exception_interact(node=item, call=call, report=report)
- return report
- def check_interactive_exception(call, report):
- return call.excinfo and not (
- hasattr(report, "wasxfail") or
- call.excinfo.errisinstance(skip.Exception) or
- call.excinfo.errisinstance(bdb.BdbQuit))
- def call_runtest_hook(item, when, **kwds):
- hookname = "pytest_runtest_" + when
- ihook = getattr(item.ihook, hookname)
- return CallInfo(lambda: ihook(item=item, **kwds), when=when)
- class CallInfo:
- """ Result/Exception info a function invocation. """
- #: None or ExceptionInfo object.
- excinfo = None
- def __init__(self, func, when):
- #: context of invocation: one of "setup", "call",
- #: "teardown", "memocollect"
- self.when = when
- self.start = time()
- try:
- self.result = func()
- except KeyboardInterrupt:
- self.stop = time()
- raise
- except:
- self.excinfo = ExceptionInfo()
- self.stop = time()
- def __repr__(self):
- if self.excinfo:
- status = "exception: %s" % str(self.excinfo.value)
- else:
- status = "result: %r" % (self.result,)
- return "<CallInfo when=%r %s>" % (self.when, status)
- def getslaveinfoline(node):
- try:
- return node._slaveinfocache
- except AttributeError:
- d = node.slaveinfo
- ver = "%s.%s.%s" % d['version_info'][:3]
- node._slaveinfocache = s = "[%s] %s -- Python %s %s" % (
- d['id'], d['sysplatform'], ver, d['executable'])
- return s
- class BaseReport(object):
- def __init__(self, **kw):
- self.__dict__.update(kw)
- def toterminal(self, out):
- if hasattr(self, 'node'):
- out.line(getslaveinfoline(self.node))
- longrepr = self.longrepr
- if longrepr is None:
- return
- if hasattr(longrepr, 'toterminal'):
- longrepr.toterminal(out)
- else:
- try:
- out.line(longrepr)
- except UnicodeEncodeError:
- out.line("<unprintable longrepr>")
- def get_sections(self, prefix):
- for name, content in self.sections:
- if name.startswith(prefix):
- yield prefix, content
- passed = property(lambda x: x.outcome == "passed")
- failed = property(lambda x: x.outcome == "failed")
- skipped = property(lambda x: x.outcome == "skipped")
- @property
- def fspath(self):
- return self.nodeid.split("::")[0]
- def pytest_runtest_makereport(item, call):
- when = call.when
- duration = call.stop-call.start
- keywords = dict([(x,1) for x in item.keywords])
- excinfo = call.excinfo
- sections = []
- if not call.excinfo:
- outcome = "passed"
- longrepr = None
- else:
- if not isinstance(excinfo, ExceptionInfo):
- outcome = "failed"
- longrepr = excinfo
- elif excinfo.errisinstance(pytest.skip.Exception):
- outcome = "skipped"
- r = excinfo._getreprcrash()
- longrepr = (str(r.path), r.lineno, r.message)
- else:
- outcome = "failed"
- if call.when == "call":
- longrepr = item.repr_failure(excinfo)
- else: # exception in setup or teardown
- longrepr = item._repr_failure_py(excinfo,
- style=item.config.option.tbstyle)
- for rwhen, key, content in item._report_sections:
- sections.append(("Captured %s %s" %(key, rwhen), content))
- return TestReport(item.nodeid, item.location,
- keywords, outcome, longrepr, when,
- sections, duration)
- class TestReport(BaseReport):
- """ Basic test report object (also used for setup and teardown calls if
- they fail).
- """
- def __init__(self, nodeid, location, keywords, outcome,
- longrepr, when, sections=(), duration=0, **extra):
- #: normalized collection node id
- self.nodeid = nodeid
- #: a (filesystempath, lineno, domaininfo) tuple indicating the
- #: actual location of a test item - it might be different from the
- #: collected one e.g. if a method is inherited from a different module.
- self.location = location
- #: a name -> value dictionary containing all keywords and
- #: markers associated with a test invocation.
- self.keywords = keywords
- #: test outcome, always one of "passed", "failed", "skipped".
- self.outcome = outcome
- #: None or a failure representation.
- self.longrepr = longrepr
- #: one of 'setup', 'call', 'teardown' to indicate runtest phase.
- self.when = when
- #: list of (secname, data) extra information which needs to
- #: marshallable
- self.sections = list(sections)
- #: time it took to run just the test
- self.duration = duration
- self.__dict__.update(extra)
- def __repr__(self):
- return "<TestReport %r when=%r outcome=%r>" % (
- self.nodeid, self.when, self.outcome)
- class TeardownErrorReport(BaseReport):
- outcome = "failed"
- when = "teardown"
- def __init__(self, longrepr, **extra):
- self.longrepr = longrepr
- self.sections = []
- self.__dict__.update(extra)
- def pytest_make_collect_report(collector):
- call = CallInfo(collector._memocollect, "memocollect")
- longrepr = None
- if not call.excinfo:
- outcome = "passed"
- else:
- from _pytest import nose
- skip_exceptions = (Skipped,) + nose.get_skip_exceptions()
- if call.excinfo.errisinstance(skip_exceptions):
- outcome = "skipped"
- r = collector._repr_failure_py(call.excinfo, "line").reprcrash
- longrepr = (str(r.path), r.lineno, r.message)
- else:
- outcome = "failed"
- errorinfo = collector.repr_failure(call.excinfo)
- if not hasattr(errorinfo, "toterminal"):
- errorinfo = CollectErrorRepr(errorinfo)
- longrepr = errorinfo
- rep = CollectReport(collector.nodeid, outcome, longrepr,
- getattr(call, 'result', None))
- rep.call = call # see collect_one_node
- return rep
- class CollectReport(BaseReport):
- def __init__(self, nodeid, outcome, longrepr, result,
- sections=(), **extra):
- self.nodeid = nodeid
- self.outcome = outcome
- self.longrepr = longrepr
- self.result = result or []
- self.sections = list(sections)
- self.__dict__.update(extra)
- @property
- def location(self):
- return (self.fspath, None, self.fspath)
- def __repr__(self):
- return "<CollectReport %r lenresult=%s outcome=%r>" % (
- self.nodeid, len(self.result), self.outcome)
- class CollectErrorRepr(TerminalRepr):
- def __init__(self, msg):
- self.longrepr = msg
- def toterminal(self, out):
- out.line(self.longrepr, red=True)
- class SetupState(object):
- """ shared state for setting up/tearing down test items or collectors. """
- def __init__(self):
- self.stack = []
- self._finalizers = {}
- def addfinalizer(self, finalizer, colitem):
- """ attach a finalizer to the given colitem.
- if colitem is None, this will add a finalizer that
- is called at the end of teardown_all().
- """
- assert colitem and not isinstance(colitem, tuple)
- assert py.builtin.callable(finalizer)
- #assert colitem in self.stack # some unit tests don't setup stack :/
- self._finalizers.setdefault(colitem, []).append(finalizer)
- def _pop_and_teardown(self):
- colitem = self.stack.pop()
- self._teardown_with_finalization(colitem)
- def _callfinalizers(self, colitem):
- finalizers = self._finalizers.pop(colitem, None)
- exc = None
- while finalizers:
- fin = finalizers.pop()
- try:
- fin()
- except Exception:
- # XXX Only first exception will be seen by user,
- # ideally all should be reported.
- if exc is None:
- exc = sys.exc_info()
- if exc:
- py.builtin._reraise(*exc)
- def _teardown_with_finalization(self, colitem):
- self._callfinalizers(colitem)
- if hasattr(colitem, "teardown"):
- colitem.teardown()
- for colitem in self._finalizers:
- assert colitem is None or colitem in self.stack \
- or isinstance(colitem, tuple)
- def teardown_all(self):
- while self.stack:
- self._pop_and_teardown()
- for key in list(self._finalizers):
- self._teardown_with_finalization(key)
- assert not self._finalizers
- def teardown_exact(self, item, nextitem):
- needed_collectors = nextitem and nextitem.listchain() or []
- self._teardown_towards(needed_collectors)
- def _teardown_towards(self, needed_collectors):
- while self.stack:
- if self.stack == needed_collectors[:len(self.stack)]:
- break
- self._pop_and_teardown()
- def prepare(self, colitem):
- """ setup objects along the collector chain to the test-method
- and teardown previously setup objects."""
- needed_collectors = colitem.listchain()
- self._teardown_towards(needed_collectors)
- # check if the last collection node has raised an error
- for col in self.stack:
- if hasattr(col, '_prepare_exc'):
- py.builtin._reraise(*col._prepare_exc)
- for col in needed_collectors[len(self.stack):]:
- self.stack.append(col)
- try:
- col.setup()
- except Exception:
- col._prepare_exc = sys.exc_info()
- raise
- def collect_one_node(collector):
- ihook = collector.ihook
- ihook.pytest_collectstart(collector=collector)
- rep = ihook.pytest_make_collect_report(collector=collector)
- call = rep.__dict__.pop("call", None)
- if call and check_interactive_exception(call, rep):
- ihook.pytest_exception_interact(node=collector, call=call, report=rep)
- return rep
- # =============================================================
- # Test OutcomeExceptions and helpers for creating them.
- class OutcomeException(Exception):
- """ OutcomeException and its subclass instances indicate and
- contain info about test and collection outcomes.
- """
- def __init__(self, msg=None, pytrace=True):
- Exception.__init__(self, msg)
- self.msg = msg
- self.pytrace = pytrace
- def __repr__(self):
- if self.msg:
- val = self.msg
- if isinstance(val, bytes):
- val = py._builtin._totext(val, errors='replace')
- return val
- return "<%s instance>" %(self.__class__.__name__,)
- __str__ = __repr__
- class Skipped(OutcomeException):
- # XXX hackish: on 3k we fake to live in the builtins
- # in order to have Skipped exception printing shorter/nicer
- __module__ = 'builtins'
- class Failed(OutcomeException):
- """ raised from an explicit call to pytest.fail() """
- __module__ = 'builtins'
- class Exit(KeyboardInterrupt):
- """ raised for immediate program exits (no tracebacks/summaries)"""
- def __init__(self, msg="unknown reason"):
- self.msg = msg
- KeyboardInterrupt.__init__(self, msg)
- # exposed helper methods
- def exit(msg):
- """ exit testing process as if KeyboardInterrupt was triggered. """
- __tracebackhide__ = True
- raise Exit(msg)
- exit.Exception = Exit
- def skip(msg=""):
- """ skip an executing test with the given message. Note: it's usually
- better to use the pytest.mark.skipif marker to declare a test to be
- skipped under certain conditions like mismatching platforms or
- dependencies. See the pytest_skipping plugin for details.
- """
- __tracebackhide__ = True
- raise Skipped(msg=msg)
- skip.Exception = Skipped
- def fail(msg="", pytrace=True):
- """ explicitly fail an currently-executing test with the given Message.
- :arg pytrace: if false the msg represents the full failure information
- and no python traceback will be reported.
- """
- __tracebackhide__ = True
- raise Failed(msg=msg, pytrace=pytrace)
- fail.Exception = Failed
- def importorskip(modname, minversion=None):
- """ return imported module if it has at least "minversion" as its
- __version__ attribute. If no minversion is specified the a skip
- is only triggered if the module can not be imported.
- """
- __tracebackhide__ = True
- compile(modname, '', 'eval') # to catch syntaxerrors
- try:
- __import__(modname)
- except ImportError:
- skip("could not import %r" %(modname,))
- mod = sys.modules[modname]
- if minversion is None:
- return mod
- verattr = getattr(mod, '__version__', None)
- if minversion is not None:
- try:
- from pkg_resources import parse_version as pv
- except ImportError:
- skip("we have a required version for %r but can not import "
- "no pkg_resources to parse version strings." %(modname,))
- if verattr is None or pv(verattr) < pv(minversion):
- skip("module %r has __version__ %r, required is: %r" %(
- modname, verattr, minversion))
- return mod
|