123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- """
- merged implementation of the cache provider
- the name cache was not choosen to ensure pluggy automatically
- ignores the external pytest-cache
- """
- import py
- import pytest
- import json
- from os.path import sep as _sep, altsep as _altsep
- class Cache(object):
- def __init__(self, config):
- self.config = config
- self._cachedir = config.rootdir.join(".cache")
- self.trace = config.trace.root.get("cache")
- if config.getvalue("cacheclear"):
- self.trace("clearing cachedir")
- if self._cachedir.check():
- self._cachedir.remove()
- self._cachedir.mkdir()
- def makedir(self, name):
- """ return a directory path object with the given name. If the
- directory does not yet exist, it will be created. You can use it
- to manage files likes e. g. store/retrieve database
- dumps across test sessions.
- :param name: must be a string not containing a ``/`` separator.
- Make sure the name contains your plugin or application
- identifiers to prevent clashes with other cache users.
- """
- if _sep in name or _altsep is not None and _altsep in name:
- raise ValueError("name is not allowed to contain path separators")
- return self._cachedir.ensure_dir("d", name)
- def _getvaluepath(self, key):
- return self._cachedir.join('v', *key.split('/'))
- def get(self, key, default):
- """ return cached value for the given key. If no value
- was yet cached or the value cannot be read, the specified
- default is returned.
- :param key: must be a ``/`` separated value. Usually the first
- name is the name of your plugin or your application.
- :param default: must be provided in case of a cache-miss or
- invalid cache values.
- """
- path = self._getvaluepath(key)
- if path.check():
- try:
- with path.open("r") as f:
- return json.load(f)
- except ValueError:
- self.trace("cache-invalid at %s" % (path,))
- return default
- def set(self, key, value):
- """ save value for the given key.
- :param key: must be a ``/`` separated value. Usually the first
- name is the name of your plugin or your application.
- :param value: must be of any combination of basic
- python types, including nested types
- like e. g. lists of dictionaries.
- """
- path = self._getvaluepath(key)
- try:
- path.dirpath().ensure_dir()
- except (py.error.EEXIST, py.error.EACCES):
- self.config.warn(
- code='I9', message='could not create cache path %s' % (path,)
- )
- return
- try:
- f = path.open('w')
- except py.error.ENOTDIR:
- self.config.warn(
- code='I9', message='cache could not write path %s' % (path,))
- else:
- with f:
- self.trace("cache-write %s: %r" % (key, value,))
- json.dump(value, f, indent=2, sort_keys=True)
- class LFPlugin:
- """ Plugin which implements the --lf (run last-failing) option """
- def __init__(self, config):
- self.config = config
- active_keys = 'lf', 'failedfirst'
- self.active = any(config.getvalue(key) for key in active_keys)
- if self.active:
- self.lastfailed = config.cache.get("cache/lastfailed", {})
- else:
- self.lastfailed = {}
- def pytest_report_header(self):
- if self.active:
- if not self.lastfailed:
- mode = "run all (no recorded failures)"
- else:
- mode = "rerun last %d failures%s" % (
- len(self.lastfailed),
- " first" if self.config.getvalue("failedfirst") else "")
- return "run-last-failure: %s" % mode
- def pytest_runtest_logreport(self, report):
- if report.failed and "xfail" not in report.keywords:
- self.lastfailed[report.nodeid] = True
- elif not report.failed:
- if report.when == "call":
- self.lastfailed.pop(report.nodeid, None)
- def pytest_collectreport(self, report):
- passed = report.outcome in ('passed', 'skipped')
- if passed:
- if report.nodeid in self.lastfailed:
- self.lastfailed.pop(report.nodeid)
- self.lastfailed.update(
- (item.nodeid, True)
- for item in report.result)
- else:
- self.lastfailed[report.nodeid] = True
- def pytest_collection_modifyitems(self, session, config, items):
- if self.active and self.lastfailed:
- previously_failed = []
- previously_passed = []
- for item in items:
- if item.nodeid in self.lastfailed:
- previously_failed.append(item)
- else:
- previously_passed.append(item)
- if not previously_failed and previously_passed:
- # running a subset of all tests with recorded failures outside
- # of the set of tests currently executing
- pass
- elif self.config.getvalue("failedfirst"):
- items[:] = previously_failed + previously_passed
- else:
- items[:] = previously_failed
- config.hook.pytest_deselected(items=previously_passed)
- def pytest_sessionfinish(self, session):
- config = self.config
- if config.getvalue("cacheshow") or hasattr(config, "slaveinput"):
- return
- prev_failed = config.cache.get("cache/lastfailed", None) is not None
- if (session.testscollected and prev_failed) or self.lastfailed:
- config.cache.set("cache/lastfailed", self.lastfailed)
- def pytest_addoption(parser):
- group = parser.getgroup("general")
- group.addoption(
- '--lf', '--last-failed', action='store_true', dest="lf",
- help="rerun only the tests that failed "
- "at the last run (or all if none failed)")
- group.addoption(
- '--ff', '--failed-first', action='store_true', dest="failedfirst",
- help="run all tests but run the last failures first. "
- "This may re-order tests and thus lead to "
- "repeated fixture setup/teardown")
- group.addoption(
- '--cache-show', action='store_true', dest="cacheshow",
- help="show cache contents, don't perform collection or tests")
- group.addoption(
- '--cache-clear', action='store_true', dest="cacheclear",
- help="remove all cache contents at start of test run.")
- def pytest_cmdline_main(config):
- if config.option.cacheshow:
- from _pytest.main import wrap_session
- return wrap_session(config, cacheshow)
- @pytest.hookimpl(tryfirst=True)
- def pytest_configure(config):
- config.cache = Cache(config)
- config.pluginmanager.register(LFPlugin(config), "lfplugin")
- @pytest.fixture
- def cache(request):
- """
- Return a cache object that can persist state between testing sessions.
- cache.get(key, default)
- cache.set(key, value)
- Keys must be a ``/`` separated value, where the first part is usually the
- name of your plugin or application to avoid clashes with other cache users.
- Values can be any object handled by the json stdlib module.
- """
- return request.config.cache
- def pytest_report_header(config):
- if config.option.verbose:
- relpath = py.path.local().bestrelpath(config.cache._cachedir)
- return "cachedir: %s" % relpath
- def cacheshow(config, session):
- from pprint import pprint
- tw = py.io.TerminalWriter()
- tw.line("cachedir: " + str(config.cache._cachedir))
- if not config.cache._cachedir.check():
- tw.line("cache is empty")
- return 0
- dummy = object()
- basedir = config.cache._cachedir
- vdir = basedir.join("v")
- tw.sep("-", "cache values")
- for valpath in vdir.visit(lambda x: x.isfile()):
- key = valpath.relto(vdir).replace(valpath.sep, "/")
- val = config.cache.get(key, dummy)
- if val is dummy:
- tw.line("%s contains unreadable content, "
- "will be ignored" % key)
- else:
- tw.line("%s contains:" % key)
- stream = py.io.TextIO()
- pprint(val, stream=stream)
- for line in stream.getvalue().splitlines():
- tw.line(" " + line)
- ddir = basedir.join("d")
- if ddir.isdir() and ddir.listdir():
- tw.sep("-", "cache directories")
- for p in basedir.join("d").visit():
- #if p.check(dir=1):
- # print("%s/" % p.relto(basedir))
- if p.isfile():
- key = p.relto(basedir)
- tw.line("%s is a file of length %d" % (
- key, p.size()))
- return 0
|