runner.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. """ basic collect and runtest protocol implementations """
  2. import bdb
  3. import sys
  4. from time import time
  5. import py
  6. import pytest
  7. from _pytest._code.code import TerminalRepr, ExceptionInfo
  8. def pytest_namespace():
  9. return {
  10. 'fail' : fail,
  11. 'skip' : skip,
  12. 'importorskip' : importorskip,
  13. 'exit' : exit,
  14. }
  15. #
  16. # pytest plugin hooks
  17. def pytest_addoption(parser):
  18. group = parser.getgroup("terminal reporting", "reporting", after="general")
  19. group.addoption('--durations',
  20. action="store", type=int, default=None, metavar="N",
  21. help="show N slowest setup/test durations (N=0 for all)."),
  22. def pytest_terminal_summary(terminalreporter):
  23. durations = terminalreporter.config.option.durations
  24. if durations is None:
  25. return
  26. tr = terminalreporter
  27. dlist = []
  28. for replist in tr.stats.values():
  29. for rep in replist:
  30. if hasattr(rep, 'duration'):
  31. dlist.append(rep)
  32. if not dlist:
  33. return
  34. dlist.sort(key=lambda x: x.duration)
  35. dlist.reverse()
  36. if not durations:
  37. tr.write_sep("=", "slowest test durations")
  38. else:
  39. tr.write_sep("=", "slowest %s test durations" % durations)
  40. dlist = dlist[:durations]
  41. for rep in dlist:
  42. nodeid = rep.nodeid.replace("::()::", "::")
  43. tr.write_line("%02.2fs %-8s %s" %
  44. (rep.duration, rep.when, nodeid))
  45. def pytest_sessionstart(session):
  46. session._setupstate = SetupState()
  47. def pytest_sessionfinish(session):
  48. session._setupstate.teardown_all()
  49. class NodeInfo:
  50. def __init__(self, location):
  51. self.location = location
  52. def pytest_runtest_protocol(item, nextitem):
  53. item.ihook.pytest_runtest_logstart(
  54. nodeid=item.nodeid, location=item.location,
  55. )
  56. runtestprotocol(item, nextitem=nextitem)
  57. return True
  58. def runtestprotocol(item, log=True, nextitem=None):
  59. hasrequest = hasattr(item, "_request")
  60. if hasrequest and not item._request:
  61. item._initrequest()
  62. rep = call_and_report(item, "setup", log)
  63. reports = [rep]
  64. if rep.passed:
  65. reports.append(call_and_report(item, "call", log))
  66. reports.append(call_and_report(item, "teardown", log,
  67. nextitem=nextitem))
  68. # after all teardown hooks have been called
  69. # want funcargs and request info to go away
  70. if hasrequest:
  71. item._request = False
  72. item.funcargs = None
  73. return reports
  74. def pytest_runtest_setup(item):
  75. item.session._setupstate.prepare(item)
  76. def pytest_runtest_call(item):
  77. try:
  78. item.runtest()
  79. except Exception:
  80. # Store trace info to allow postmortem debugging
  81. type, value, tb = sys.exc_info()
  82. tb = tb.tb_next # Skip *this* frame
  83. sys.last_type = type
  84. sys.last_value = value
  85. sys.last_traceback = tb
  86. del tb # Get rid of it in this namespace
  87. raise
  88. def pytest_runtest_teardown(item, nextitem):
  89. item.session._setupstate.teardown_exact(item, nextitem)
  90. def pytest_report_teststatus(report):
  91. if report.when in ("setup", "teardown"):
  92. if report.failed:
  93. # category, shortletter, verbose-word
  94. return "error", "E", "ERROR"
  95. elif report.skipped:
  96. return "skipped", "s", "SKIPPED"
  97. else:
  98. return "", "", ""
  99. #
  100. # Implementation
  101. def call_and_report(item, when, log=True, **kwds):
  102. call = call_runtest_hook(item, when, **kwds)
  103. hook = item.ihook
  104. report = hook.pytest_runtest_makereport(item=item, call=call)
  105. if log:
  106. hook.pytest_runtest_logreport(report=report)
  107. if check_interactive_exception(call, report):
  108. hook.pytest_exception_interact(node=item, call=call, report=report)
  109. return report
  110. def check_interactive_exception(call, report):
  111. return call.excinfo and not (
  112. hasattr(report, "wasxfail") or
  113. call.excinfo.errisinstance(skip.Exception) or
  114. call.excinfo.errisinstance(bdb.BdbQuit))
  115. def call_runtest_hook(item, when, **kwds):
  116. hookname = "pytest_runtest_" + when
  117. ihook = getattr(item.ihook, hookname)
  118. return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  119. class CallInfo:
  120. """ Result/Exception info a function invocation. """
  121. #: None or ExceptionInfo object.
  122. excinfo = None
  123. def __init__(self, func, when):
  124. #: context of invocation: one of "setup", "call",
  125. #: "teardown", "memocollect"
  126. self.when = when
  127. self.start = time()
  128. try:
  129. self.result = func()
  130. except KeyboardInterrupt:
  131. self.stop = time()
  132. raise
  133. except:
  134. self.excinfo = ExceptionInfo()
  135. self.stop = time()
  136. def __repr__(self):
  137. if self.excinfo:
  138. status = "exception: %s" % str(self.excinfo.value)
  139. else:
  140. status = "result: %r" % (self.result,)
  141. return "<CallInfo when=%r %s>" % (self.when, status)
  142. def getslaveinfoline(node):
  143. try:
  144. return node._slaveinfocache
  145. except AttributeError:
  146. d = node.slaveinfo
  147. ver = "%s.%s.%s" % d['version_info'][:3]
  148. node._slaveinfocache = s = "[%s] %s -- Python %s %s" % (
  149. d['id'], d['sysplatform'], ver, d['executable'])
  150. return s
  151. class BaseReport(object):
  152. def __init__(self, **kw):
  153. self.__dict__.update(kw)
  154. def toterminal(self, out):
  155. if hasattr(self, 'node'):
  156. out.line(getslaveinfoline(self.node))
  157. longrepr = self.longrepr
  158. if longrepr is None:
  159. return
  160. if hasattr(longrepr, 'toterminal'):
  161. longrepr.toterminal(out)
  162. else:
  163. try:
  164. out.line(longrepr)
  165. except UnicodeEncodeError:
  166. out.line("<unprintable longrepr>")
  167. def get_sections(self, prefix):
  168. for name, content in self.sections:
  169. if name.startswith(prefix):
  170. yield prefix, content
  171. passed = property(lambda x: x.outcome == "passed")
  172. failed = property(lambda x: x.outcome == "failed")
  173. skipped = property(lambda x: x.outcome == "skipped")
  174. @property
  175. def fspath(self):
  176. return self.nodeid.split("::")[0]
  177. def pytest_runtest_makereport(item, call):
  178. when = call.when
  179. duration = call.stop-call.start
  180. keywords = dict([(x,1) for x in item.keywords])
  181. excinfo = call.excinfo
  182. sections = []
  183. if not call.excinfo:
  184. outcome = "passed"
  185. longrepr = None
  186. else:
  187. if not isinstance(excinfo, ExceptionInfo):
  188. outcome = "failed"
  189. longrepr = excinfo
  190. elif excinfo.errisinstance(pytest.skip.Exception):
  191. outcome = "skipped"
  192. r = excinfo._getreprcrash()
  193. longrepr = (str(r.path), r.lineno, r.message)
  194. else:
  195. outcome = "failed"
  196. if call.when == "call":
  197. longrepr = item.repr_failure(excinfo)
  198. else: # exception in setup or teardown
  199. longrepr = item._repr_failure_py(excinfo,
  200. style=item.config.option.tbstyle)
  201. for rwhen, key, content in item._report_sections:
  202. sections.append(("Captured %s %s" %(key, rwhen), content))
  203. return TestReport(item.nodeid, item.location,
  204. keywords, outcome, longrepr, when,
  205. sections, duration)
  206. class TestReport(BaseReport):
  207. """ Basic test report object (also used for setup and teardown calls if
  208. they fail).
  209. """
  210. def __init__(self, nodeid, location, keywords, outcome,
  211. longrepr, when, sections=(), duration=0, **extra):
  212. #: normalized collection node id
  213. self.nodeid = nodeid
  214. #: a (filesystempath, lineno, domaininfo) tuple indicating the
  215. #: actual location of a test item - it might be different from the
  216. #: collected one e.g. if a method is inherited from a different module.
  217. self.location = location
  218. #: a name -> value dictionary containing all keywords and
  219. #: markers associated with a test invocation.
  220. self.keywords = keywords
  221. #: test outcome, always one of "passed", "failed", "skipped".
  222. self.outcome = outcome
  223. #: None or a failure representation.
  224. self.longrepr = longrepr
  225. #: one of 'setup', 'call', 'teardown' to indicate runtest phase.
  226. self.when = when
  227. #: list of (secname, data) extra information which needs to
  228. #: marshallable
  229. self.sections = list(sections)
  230. #: time it took to run just the test
  231. self.duration = duration
  232. self.__dict__.update(extra)
  233. def __repr__(self):
  234. return "<TestReport %r when=%r outcome=%r>" % (
  235. self.nodeid, self.when, self.outcome)
  236. class TeardownErrorReport(BaseReport):
  237. outcome = "failed"
  238. when = "teardown"
  239. def __init__(self, longrepr, **extra):
  240. self.longrepr = longrepr
  241. self.sections = []
  242. self.__dict__.update(extra)
  243. def pytest_make_collect_report(collector):
  244. call = CallInfo(collector._memocollect, "memocollect")
  245. longrepr = None
  246. if not call.excinfo:
  247. outcome = "passed"
  248. else:
  249. from _pytest import nose
  250. skip_exceptions = (Skipped,) + nose.get_skip_exceptions()
  251. if call.excinfo.errisinstance(skip_exceptions):
  252. outcome = "skipped"
  253. r = collector._repr_failure_py(call.excinfo, "line").reprcrash
  254. longrepr = (str(r.path), r.lineno, r.message)
  255. else:
  256. outcome = "failed"
  257. errorinfo = collector.repr_failure(call.excinfo)
  258. if not hasattr(errorinfo, "toterminal"):
  259. errorinfo = CollectErrorRepr(errorinfo)
  260. longrepr = errorinfo
  261. rep = CollectReport(collector.nodeid, outcome, longrepr,
  262. getattr(call, 'result', None))
  263. rep.call = call # see collect_one_node
  264. return rep
  265. class CollectReport(BaseReport):
  266. def __init__(self, nodeid, outcome, longrepr, result,
  267. sections=(), **extra):
  268. self.nodeid = nodeid
  269. self.outcome = outcome
  270. self.longrepr = longrepr
  271. self.result = result or []
  272. self.sections = list(sections)
  273. self.__dict__.update(extra)
  274. @property
  275. def location(self):
  276. return (self.fspath, None, self.fspath)
  277. def __repr__(self):
  278. return "<CollectReport %r lenresult=%s outcome=%r>" % (
  279. self.nodeid, len(self.result), self.outcome)
  280. class CollectErrorRepr(TerminalRepr):
  281. def __init__(self, msg):
  282. self.longrepr = msg
  283. def toterminal(self, out):
  284. out.line(self.longrepr, red=True)
  285. class SetupState(object):
  286. """ shared state for setting up/tearing down test items or collectors. """
  287. def __init__(self):
  288. self.stack = []
  289. self._finalizers = {}
  290. def addfinalizer(self, finalizer, colitem):
  291. """ attach a finalizer to the given colitem.
  292. if colitem is None, this will add a finalizer that
  293. is called at the end of teardown_all().
  294. """
  295. assert colitem and not isinstance(colitem, tuple)
  296. assert py.builtin.callable(finalizer)
  297. #assert colitem in self.stack # some unit tests don't setup stack :/
  298. self._finalizers.setdefault(colitem, []).append(finalizer)
  299. def _pop_and_teardown(self):
  300. colitem = self.stack.pop()
  301. self._teardown_with_finalization(colitem)
  302. def _callfinalizers(self, colitem):
  303. finalizers = self._finalizers.pop(colitem, None)
  304. exc = None
  305. while finalizers:
  306. fin = finalizers.pop()
  307. try:
  308. fin()
  309. except Exception:
  310. # XXX Only first exception will be seen by user,
  311. # ideally all should be reported.
  312. if exc is None:
  313. exc = sys.exc_info()
  314. if exc:
  315. py.builtin._reraise(*exc)
  316. def _teardown_with_finalization(self, colitem):
  317. self._callfinalizers(colitem)
  318. if hasattr(colitem, "teardown"):
  319. colitem.teardown()
  320. for colitem in self._finalizers:
  321. assert colitem is None or colitem in self.stack \
  322. or isinstance(colitem, tuple)
  323. def teardown_all(self):
  324. while self.stack:
  325. self._pop_and_teardown()
  326. for key in list(self._finalizers):
  327. self._teardown_with_finalization(key)
  328. assert not self._finalizers
  329. def teardown_exact(self, item, nextitem):
  330. needed_collectors = nextitem and nextitem.listchain() or []
  331. self._teardown_towards(needed_collectors)
  332. def _teardown_towards(self, needed_collectors):
  333. while self.stack:
  334. if self.stack == needed_collectors[:len(self.stack)]:
  335. break
  336. self._pop_and_teardown()
  337. def prepare(self, colitem):
  338. """ setup objects along the collector chain to the test-method
  339. and teardown previously setup objects."""
  340. needed_collectors = colitem.listchain()
  341. self._teardown_towards(needed_collectors)
  342. # check if the last collection node has raised an error
  343. for col in self.stack:
  344. if hasattr(col, '_prepare_exc'):
  345. py.builtin._reraise(*col._prepare_exc)
  346. for col in needed_collectors[len(self.stack):]:
  347. self.stack.append(col)
  348. try:
  349. col.setup()
  350. except Exception:
  351. col._prepare_exc = sys.exc_info()
  352. raise
  353. def collect_one_node(collector):
  354. ihook = collector.ihook
  355. ihook.pytest_collectstart(collector=collector)
  356. rep = ihook.pytest_make_collect_report(collector=collector)
  357. call = rep.__dict__.pop("call", None)
  358. if call and check_interactive_exception(call, rep):
  359. ihook.pytest_exception_interact(node=collector, call=call, report=rep)
  360. return rep
  361. # =============================================================
  362. # Test OutcomeExceptions and helpers for creating them.
  363. class OutcomeException(Exception):
  364. """ OutcomeException and its subclass instances indicate and
  365. contain info about test and collection outcomes.
  366. """
  367. def __init__(self, msg=None, pytrace=True):
  368. Exception.__init__(self, msg)
  369. self.msg = msg
  370. self.pytrace = pytrace
  371. def __repr__(self):
  372. if self.msg:
  373. val = self.msg
  374. if isinstance(val, bytes):
  375. val = py._builtin._totext(val, errors='replace')
  376. return val
  377. return "<%s instance>" %(self.__class__.__name__,)
  378. __str__ = __repr__
  379. class Skipped(OutcomeException):
  380. # XXX hackish: on 3k we fake to live in the builtins
  381. # in order to have Skipped exception printing shorter/nicer
  382. __module__ = 'builtins'
  383. class Failed(OutcomeException):
  384. """ raised from an explicit call to pytest.fail() """
  385. __module__ = 'builtins'
  386. class Exit(KeyboardInterrupt):
  387. """ raised for immediate program exits (no tracebacks/summaries)"""
  388. def __init__(self, msg="unknown reason"):
  389. self.msg = msg
  390. KeyboardInterrupt.__init__(self, msg)
  391. # exposed helper methods
  392. def exit(msg):
  393. """ exit testing process as if KeyboardInterrupt was triggered. """
  394. __tracebackhide__ = True
  395. raise Exit(msg)
  396. exit.Exception = Exit
  397. def skip(msg=""):
  398. """ skip an executing test with the given message. Note: it's usually
  399. better to use the pytest.mark.skipif marker to declare a test to be
  400. skipped under certain conditions like mismatching platforms or
  401. dependencies. See the pytest_skipping plugin for details.
  402. """
  403. __tracebackhide__ = True
  404. raise Skipped(msg=msg)
  405. skip.Exception = Skipped
  406. def fail(msg="", pytrace=True):
  407. """ explicitly fail an currently-executing test with the given Message.
  408. :arg pytrace: if false the msg represents the full failure information
  409. and no python traceback will be reported.
  410. """
  411. __tracebackhide__ = True
  412. raise Failed(msg=msg, pytrace=pytrace)
  413. fail.Exception = Failed
  414. def importorskip(modname, minversion=None):
  415. """ return imported module if it has at least "minversion" as its
  416. __version__ attribute. If no minversion is specified the a skip
  417. is only triggered if the module can not be imported.
  418. """
  419. __tracebackhide__ = True
  420. compile(modname, '', 'eval') # to catch syntaxerrors
  421. try:
  422. __import__(modname)
  423. except ImportError:
  424. skip("could not import %r" %(modname,))
  425. mod = sys.modules[modname]
  426. if minversion is None:
  427. return mod
  428. verattr = getattr(mod, '__version__', None)
  429. if minversion is not None:
  430. try:
  431. from pkg_resources import parse_version as pv
  432. except ImportError:
  433. skip("we have a required version for %r but can not import "
  434. "no pkg_resources to parse version strings." %(modname,))
  435. if verattr is None or pv(verattr) < pv(minversion):
  436. skip("module %r has __version__ %r, required is: %r" %(
  437. modname, verattr, minversion))
  438. return mod