Source code for attest

# coding:utf-8
from __future__ import with_statement

import threading
import sys
import traceback
from functools import wraps
import inspect
from contextlib import contextmanager

try:
    from abc import ABCMeta, abstractmethod
except ImportError:
    ABCMeta = type
    abstractmethod = lambda x: x

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO


REPORTERS = {}


statistics = threading.local()
statistics.assertions = 0


[docs]class AbstractReporter(object): """Optional base for reporters, serves as documentation and improves errors for incomplete reporters. :param tests: The list of test functions we will be running. """ __metaclass__ = ABCMeta @abstractmethod
[docs] def begin(self, tests): """Called with the list of tests when a test run has begun.""" raise NotImplementedError
@abstractmethod
[docs] def success(self, test, stdout, stderr): """When a test succeeds, this method is called with the test function and the captured stdout and stderr output as lists of lines. """ raise NotImplementedError
@abstractmethod
[docs] def failure(self, test, error, traceback, stdout, stderr): """When a test fails, this method is called with the test function, the exception instance that was raised, a cleaned up traceback string and the captured stdout and stderr output as lists of lines. """ raise NotImplementedError
@abstractmethod
[docs] def finished(self): """Called when all tests have run.""" raise NotImplementedError
[docs]class PlainReporter(AbstractReporter): """Plain text ASCII output for humans.""" def begin(self, tests): self.total = len(tests) self.failures = [] def success(self, test, stdout, stderr): sys.stdout.write('.') sys.stdout.flush() def failure(self, test, error, traceback, stdout, stderr): if isinstance(error, AssertionError): sys.stdout.write('F') else: sys.stdout.write('E') sys.stdout.flush() self.failures.append((test, traceback)) def finished(self): print print for test, trace in self.failures: if test.__module__ == '__main__': print test.__name__ else: print '.'.join((test.__module__, test.__name__)) if test.__doc__: print inspect.getdoc(test) print '-' * 80 print trace print print 'Failures: %s/%s (%s assertions)' % (len(self.failures), self.total, statistics.assertions) if self.failures: raise SystemExit(1)
REPORTERS['plain'] = PlainReporter
[docs]class FancyReporter(AbstractReporter): """Heavily uses ANSI escape codes for fancy output to 256-color terminals. Progress of running the tests is indicated by a progressbar and failures are shown with syntax highlighted tracebacks. """ def __init__(self, style='bw'): self.style = style def begin(self, tests): from progressbar import ProgressBar, Percentage, \ Bar, ETA, SimpleProgress widgets = [SimpleProgress(), ' [', ETA(), Bar(), Percentage(), ']'] self.counter = 0 self.progress = ProgressBar(maxval=len(tests), widgets=widgets) self.progress.start() self.failures = [] def success(self, test, stdout, stderr): self.counter += 1 self.progress.update(self.counter) def failure(self, test, error, traceback, stdout, stderr): self.counter += 1 self.progress.update(self.counter) self.failures.append((test, traceback, stdout, stderr)) def finished(self): from pygments.console import colorize from pygments import highlight from pygments.lexers import PythonTracebackLexer from pygments.formatters import Terminal256Formatter self.progress.finish() print for test, trace, out, err in self.failures: if test.__module__ == '__main__': name = test.__name__ else: name = '.'.join((test.__module__, test.__name__)) print colorize('bold', name) if test.__doc__: print inspect.getdoc(test) print '─' * 80 if out: print colorize('faint', '\n'.join(out)) if err: print colorize('darkred', '\n'.join(err)) print highlight(trace, PythonTracebackLexer(), Terminal256Formatter(style=self.style)) if self.failures: failed = colorize('red', str(len(self.failures))) else: failed = len(self.failures) print 'Failures: %s/%s (%s assertions)' % (failed, self.counter, statistics.assertions) if self.failures: raise SystemExit(1)
REPORTERS['fancy'] = FancyReporter
[docs]def auto_reporter(style=None): """Select a reporter based on the target output. This is the default reporter. :param style: Passed to :class:`FancyReporter` if it is used. :rtype: :class:`FancyReporter` if output is a terminal otherwise a :class:`PlainReporter`. """ if sys.stdout.isatty(): if style is None: return FancyReporter() return FancyReporter(style) return PlainReporter()
REPORTERS['auto'] = auto_reporter
[docs]class XmlReporter(AbstractReporter): """Report the result of a testrun in an XML format. Not compatible with JUnit or XUnit. """ def __init__(self): self.escape = __import__('cgi').escape def begin(self, tests): print '<?xml version="1.0" encoding="UTF-8"?>' print '<testreport tests="%d">' % len(tests) def success(self, test, stdout, stderr): name = '.'.join((test.__module__, test.__name__)) print ' <pass name="%s"/>' % name def failure(self, test, error, traceback, stdout, stderr): name = '.'.join((test.__module__, test.__name__)) if isinstance(error, AssertionError): tag = 'fail' else: tag = 'error' print ' <%s name="%s" type="%s">' % (tag, name, error.__class__.__name__) print self.escape('\n'.join(' ' * 4 + line for line in traceback.splitlines()), quote=True) print ' </%s>' % tag def finished(self): print '</testreport>'
REPORTERS['xml'] = XmlReporter
[docs]def get_reporter_by_name(name, default='auto'): """Get an :class:`AbstractReporter` by name, falling back on a default. Available reporters: * ``'fancy'`` — :class:`FancyReporter` * ``'plain'`` — :class:`PlainReporter` * ``'xml'`` — :class:`XmlReporter` * ``'auto'`` — :func:`auto_reporter` :param name: One of the above strings. :param default: The fallback reporter if no reporter has the supplied name, defaulting to ``'auto'``. :raises KeyError: If neither the name or the default is a valid name of a reporter. :rtype: Callable returning an instance of an :class:`AbstractReporter`. """ return REPORTERS.get(name, REPORTERS[default])
@contextmanager
[docs]def capture_output(): """Context manager capturing standard output and error. Yields a tuple of the two streams as lists of lines. :: with capture_output() as (out, err): print 'Captured' Assert(out) == ['Captured'] """ stdout, stderr = sys.stdout, sys.stderr sys.stdout, sys.stderr = StringIO(), StringIO() out, err = [], [] try: yield out, err finally: out.extend(sys.stdout.getvalue().splitlines()) err.extend(sys.stderr.getvalue().splitlines()) sys.stdout, sys.stderr = stdout, stderr
[docs]class Tests(object): """Collection of test functions. :param tests: Iterable of other test collections to register with this one. """ def __init__(self, tests=()): self._tests = [] for collection in tests: self.register(collection) self._context = None def __iter__(self): return iter(self._tests) def __len__(self): return len(self._tests)
[docs] def test(self, func): """Decorate a function as a test belonging to this collection.""" @wraps(func) def wrapper(): if self._context is None: func() else: with self._context() as context: if len(inspect.getargspec(func)[0]) != 0: if type(context) is tuple: # type() is intentional func(*context) else: func(context) else: func() self._tests.append(wrapper) return wrapper
[docs] def context(self, func): """Decorate a function as a :func:`~contextlib.contextmanager` for running the tests in this collection in. Corresponds to setup and teardown in other testing libraries. :: db = Tests() @db.context def connect(): con = connect_db() try: yield con finally: con.disconnect() @db.test def using_connection(con): Assert(con).is_not(None) The above corresponds to:: db = Tests() def connect(): con = connect_db() try: yield con finally: con.disconnect() @db.test def using_connection(): with connect() as con: Assert(con).is_not(None) The difference is that this decorator applies the context to all tests defined in its collection, so it's less repetitive. Yielding :const:`None` or nothing passes no arguments to the test, yielding a single value other than a tuple passes that value as the sole argument to the test, yielding a tuple splats the tuple as the arguments to the test. If you want to yield a tuple as the sole argument, wrap it in a one-tuple or unsplat the args in the test. """ func = contextmanager(func) self._context = func return func
[docs] def register(self, tests): """Merge in another test collection.""" if inspect.isclass(tests): self._tests.extend(tests()) return tests self._tests.extend(tests)
[docs] def test_suite(self): """Create a :class:`unittest.TestSuite` from this collection.""" from unittest import TestSuite, FunctionTestCase suite = TestSuite() for test in self: suite.addTest(FunctionTestCase(test)) return suite
[docs] def run(self, reporter=auto_reporter): """Run all tests in this collection. :param reporter: An instance of :class:`AbstractReporter` or a callable returning something implementing that API (not enforced). """ if not isinstance(reporter, AbstractReporter): reporter = reporter() reporter.begin(self._tests) for test in self: try: with capture_output() as (out, err): assert test() is not False, 'test returned False' except BaseException, e: if isinstance(e, KeyboardInterrupt): break lines = traceback.format_exc().splitlines() clean = lines[0:1] stack = iter(lines[1:-1]) # stack traces are in the middle # loop two lines at a time for first, second in zip(stack, stack): # only keep if this file is not the source of the trace if __file__[0:-1] not in first: clean.extend((first, second)) clean.append(lines[-1]) reporter.failure(test, e, '\n'.join(clean), out, err) else: reporter.success(test, out, err) reporter.finished()
[docs]def test(meth): """Mark a :class:`TestBase` method as a test and wrap it to run in the :meth:`TestBase.__context__` of the subclass. """ @wraps(meth) def wrapper(self): with contextmanager(self.__context__)(): meth(self) wrapper.__test__ = True return wrapper
[docs]class TestBase(object): """Base for test classes. Decorate test methods with :func:`test`. Needs to be registered with a :class:`Tests` collection to be run. For setup and teardown, override :meth:`__context__` like a :func:`~contextlib.contextmanager` (without the decorator). :: class Math(TestBase): def __context__(self): self.two = 1 + 1 yield del self.two @test def arithmetics(self): Assert(self.two) == 2 suite = Tests([Math()]) suite.run() """ def __context__(self): yield def __iter__(self): for name in dir(self): attr = getattr(self, name) if getattr(attr, '__test__', False) and callable(attr): yield attr
[docs]class Loader(object): """Run tests with Attest via distribute:: setup( test_loader='attest:Loader', test_suite='tests.collection', ) Now, ``python setup.py -q test`` is equivalent to:: from tests import collection collection.run() If you want to run the tests as a normal unittest suite, try :meth:`Tests.test_suite` instead:: setup( test_suite='tests.collection.test_suite' ) """ def loadTestsFromNames(self, names, module=None): mod, collection = names[0].rsplit('.', 1) mod = __import__(mod, fromlist=[collection]) collection = getattr(mod, collection) collection.run() raise SystemExit
[docs]def assert_(expr, msg=None): """Like :keyword:`assert`, but counts the assertion.""" statistics.assertions += 1 assert expr, msg return expr
[docs]class Assert(object): """Wrap an object such that boolean operations on it fails with an :exc:`AssertionError` if the operation results in :const:`False`, with more helpful error messages on failure than :keyword:`assert`. A test failure is simply an unhandled exception, so it is completely optional to use this class. Examples:: Assert(1 + 1) == 2 2 in Assert([1, 2, 3]) Attributes are proxied to the wrapped object, returning the result wrapped as well:: hello = Assert('hello') hello == 'hello' hello.upper() == 'HELLO' hello.capitalize() == 'Hello' Used in boolean context, fails if non-true. These all fail:: bool(Assert(0)) if Assert(0): pass assert Assert(0) Identical to, except for the more helpful failure message:: Assert(bool(0)) == True """ #: The wrapped object obj = None def __init__(self, obj=None): self.obj = obj @property
[docs] def __class__(self): return Assert(self.obj.__class__)
[docs] def __str__(self): """Wrapped proxy to the wrapped object's *__str__*, can be used for testing the string adaption of the object:: Assert(1).__str__() == '1' .. warning:: :func:`str` on :class:`Assert` objects does not work. """ return Assert(self.obj.__str__())
[docs] def __getattr__(self, name): """Proxy all attributes to the wrapped object, wrapping the result. """ return Assert(getattr(self.obj, name))
[docs] def __call__(self, *args, **kwargs): """Allow calling of wrapped callables, wrapping the return value. Useful for testing methods on a wrapped object via attribute proxying:: Assert('Hello').upper() == 'HELLO' """ return Assert(self.obj(*args, **kwargs))
[docs] def __getitem__(self, key): """Access an item on the wrapped object and return the result wrapped as well. :: Assert([1, 2, 3])[1] == 2 """ return Assert(self.obj[key])
[docs] def __eq__(self, obj): """Test for equality with ``==``.""" return assert_(self.obj == obj, '%r != %r' % (self.obj, obj))
[docs] def __ne__(self, obj): """Test for inequality with ``!=``.""" return assert_(self.obj != obj, '%r == %r' % (self.obj, obj))
[docs] def is_(self, obj): """The :keyword:`is` operator is not overridable, for good reasons (that would defeat its purpose), so you can use this method for asserting identity:: Assert(True).is_(True) """ return assert_(self.obj is obj, '%r is not %r' % (self.obj, obj))
[docs] def is_not(self, obj): """The negated form of :meth:`is_`, corresponding to the ``is not`` operation:: Assert([]).is_not([]) """ return assert_(self.obj is not obj, '%r is %r' % (self.obj, obj))
[docs] def __contains__(self, obj): """Test for membership with :keyword:`in`.""" return assert_(obj in self.obj, '%r not in %r' % (obj, self.obj))
[docs] def in_(self, obj): """Assert membership. While you can use the :keyword:`in` operator, its order is inconsistent with the rest of the operators and doesn't work with the ``not in`` operation. :: 2 in Assert([1, 2, 3]) Assert(2).in_([1, 2, 3]) """ return assert_(self.obj in obj, '%r not in %r' % (self.obj, obj))
[docs] def not_in(self, obj): """The negated form of :meth:`in_`, corresponding to the ``not in`` operation:: Assert(0).not_in([1, 2, 3]) """ return assert_(self.obj not in obj, '%r in %r' % (self.obj, obj))
[docs] def __lt__(self, obj): """Test for lesserness with ``<``.""" return assert_(self.obj < obj, '%r >= %r' % (self.obj, obj))
[docs] def __le__(self, obj): """Test for lesserness or equality with ``<=``.""" return assert_(self.obj <= obj, '%r > %r' % (self.obj, obj))
[docs] def __gt__(self, obj): """Test for greaterness with ``>``.""" return assert_(self.obj > obj, '%r <= %r' % (self.obj, obj))
[docs] def __ge__(self, obj): """Test for greaterness or equality with ``>=``.""" return assert_(self.obj >= obj, '%r < %r' % (self.obj, obj))
[docs] def __nonzero__(self): """Test for truthiness in boolean context.""" return assert_(self.obj, 'not %r' % self.obj)
@staticmethod @contextmanager
[docs] def raises(*exceptions): """Context manager that fails if a particular exception is not raised. Yields the caught exception wrapped in :class:`Assert`:: with Assert.raises(IOError) as error: open('/etc/passwd', 'w') error.errno == 13 :param exceptions: Expected exception classes. """ statistics.assertions += 1 proxy = Assert() try: yield proxy except exceptions, error: proxy.obj = error else: if len(exceptions) > 1: errors = '(' + ', '.join(e.__name__ for e in exceptions) + ')' else: errors = exceptions[0].__name__ raise AssertionError("didn't raise %s" % errors)
@staticmethod @contextmanager
[docs] def not_raising(exception): """Context manager that fails if a particular exception is raised. A raised exception consitutes a failure anyway and this is mainly used for testing Attest itself. :: with Assert.not_raising(IOError): open('/etc/passwd', 'r') :param exception: An exception class. """ statistics.assertions += 1 try: yield except exception: raise AssertionError('raised %s' % exception.__name__)
[docs] def __repr__(self): """Not proxied to the wrapped object. To test that do something like:: Assert(repr(obj)) == 'expectation' """ return 'Assert(%r)' % self.obj