import logging
from collections import defaultdict
from collections import namedtuple
from difflib import unified_diff
from functools import partial
from functools import wraps
from inspect import isclass
from logging import _checkLevel
from logging import getLevelName
from logging import getLogger
from sys import _getframe
from traceback import format_stack
from aspectlib import ALL_METHODS
from aspectlib import mimic
from aspectlib import weave
from .utils import Sentinel
from .utils import camelcase_to_underscores
from .utils import container
from .utils import logf
from .utils import qualname
from .utils import repr_ex
try:
from logging import _levelNames as nameToLevel
except ImportError:
from logging import _nameToLevel as nameToLevel
try:
from dummy_thread import allocate_lock
except ImportError:
try:
from _dummy_thread import allocate_lock
except ImportError:
from _thread import allocate_lock
from collections import ChainMap
from collections import OrderedDict
__all__ = 'mock', 'record', "Story"
logger = getLogger(__name__)
logexception = logf(logger.exception)
Call = namedtuple('Call', ('self', 'args', 'kwargs'))
CallEx = namedtuple('CallEx', ('self', 'name', 'args', 'kwargs'))
Result = namedtuple('Result', ('self', 'args', 'kwargs', 'result', 'exception'))
ResultEx = namedtuple('ResultEx', ('self', 'name', 'args', 'kwargs', 'result', 'exception'))
_INIT = Sentinel("INIT")
[docs]
def mock(return_value, call=False):
"""
Factory for a decorator that makes the function return a given `return_value`.
Args:
return_value: Value to return from the wrapper.
call (bool): If ``True``, call the decorated function. (default: ``False``)
Returns:
A decorator.
"""
def mock_decorator(func):
@wraps(func)
def mock_wrapper(*args, **kwargs):
if call:
func(*args, **kwargs)
return return_value
return mock_wrapper
return mock_decorator
class LogCapture(object):
"""
Records all log messages made on the given logger. Assumes the logger has a ``_log`` method.
Example::
>>> import logging
>>> logger = logging.getLogger('mylogger')
>>> with LogCapture(logger, level='INFO') as logs:
... logger.debug("Message from debug: %s", 'somearg')
... logger.info("Message from info: %s", 'somearg')
... logger.error("Message from error: %s", 'somearg')
>>> logs.calls
[('Message from info: %s', ('somearg',), 'INFO'), ('Message from error: %s', ('somearg',), 'ERROR')]
>>> logs.messages
[('INFO', 'Message from info: somearg'), ('ERROR', 'Message from error: somearg')]
>>> logs.has('Message from info: %s')
True
>>> logs.has('Message from info: somearg')
True
>>> logs.has('Message from info: %s', 'badarg')
False
>>> logs.has('Message from debug: %s')
False
>>> logs.assertLogged('Message from error: %s')
>>> logs.assertLogged('Message from error: %s')
>>> logs.assertLogged('Message from error: %s')
.. versionchanged:: 1.3.0
Added ``messages`` property.
Changed ``calls`` to retrun the level as a string (instead of int).
"""
def __init__(self, logger, level='DEBUG'):
self._logger = logger
self._level = nameToLevel[level]
self._calls = []
self._rollback = None
def __enter__(self):
self._rollback = weave(
self._logger,
record(callback=self._callback, extended=True, iscalled=True),
methods=('debug', 'info', 'warning', 'error', 'exception', 'critical', 'log'),
)
return self
def __exit__(self, *exc):
self._rollback()
def _callback(self, _binding, qualname, args, _kwargs):
_, name = qualname.rsplit('.', 1)
if name == 'log':
level, args = _checkLevel(args[0]), args[1:]
elif name == 'exception':
level = logging.ERROR
else:
level = _checkLevel(name.upper())
if len(args) > 1:
message, args = args[0], args[1:]
else:
message, args = args[0], ()
if level >= self._level:
self._calls.append((message % args if args else message, message, args, getLevelName(level)))
@property
def calls(self):
return [i[1:] for i in self._calls]
@property
def messages(self):
return [(i[-1], i[0]) for i in self._calls]
def has(self, message, *args, **kwargs):
level = kwargs.pop('level', None)
assert not kwargs, "Unexpected arguments: %s" % kwargs
for call_final_message, call_message, call_args, call_level in self._calls:
if level is None or level == call_level:
if message == call_message and args == call_args if args else message == call_final_message or message == call_message:
return True
return False
def assertLogged(self, message, *args, **kwargs):
if not self.has(message, *args, **kwargs):
raise AssertionError(
"There's no such message %r (with args %r) logged on %s. Logged messages where: %s"
% (message, args, self._logger, self.calls)
)
class _RecordingFunctionWrapper(object):
"""
Function wrapper that records calls and can be used as an weaver context manager.
See :obj:`aspectlib.test.record` for arguments.
"""
def __init__(self, wrapped, iscalled=True, calls=None, callback=None, extended=False, results=False, recurse_lock=None, binding=None):
assert not results or iscalled, "`iscalled` must be True if `results` is True"
mimic(self, wrapped)
self.__wrapped = wrapped
self.__entanglement = None
self.__iscalled = iscalled
self.__binding = binding
self.__callback = callback
self.__extended = extended
self.__results = results
self.__recurse_lock = recurse_lock
self.calls = [] if not callback and calls is None else calls
def __call__(self, *args, **kwargs):
record = not self.__recurse_lock or self.__recurse_lock.acquire(False)
try:
if self.__results:
try:
result = self.__wrapped(*args, **kwargs)
except Exception as exc:
if record:
self.__record(args, kwargs, None, exc)
raise
else:
if record:
self.__record(args, kwargs, result, None)
return result
else:
if record:
self.__record(args, kwargs)
if self.__iscalled:
return self.__wrapped(*args, **kwargs)
finally:
if record and self.__recurse_lock:
self.__recurse_lock.release()
def __record(self, args, kwargs, *response):
if self.__callback is not None:
self.__callback(self.__binding, qualname(self), args, kwargs, *response)
if self.calls is not None:
if self.__extended:
self.calls.append((ResultEx if response else CallEx)(self.__binding, qualname(self), args, kwargs, *response))
else:
self.calls.append((Result if response else Call)(self.__binding, args, kwargs, *response))
def __get__(self, instance, owner):
return _RecordingFunctionWrapper(
self.__wrapped.__get__(instance, owner),
iscalled=self.__iscalled,
calls=self.calls,
callback=self.__callback,
extended=self.__extended,
results=self.__results,
binding=instance,
)
def __enter__(self):
self.__entanglement = weave(self.__wrapped, lambda _: self)
return self
def __exit__(self, *args):
self.__entanglement.rollback()
[docs]
def record(func=None, recurse_lock_factory=allocate_lock, **options):
"""
Factory or decorator (depending if `func` is initially given).
Args:
callback (list):
An a callable that is to be called with ``instance, function, args, kwargs``.
calls (list):
An object where the `Call` objects are appended. If not given and ``callback`` is not specified then a new list
object will be created.
iscalled (bool):
If ``True`` the `func` will be called. (default: ``False``)
extended (bool):
If ``True`` the `func`'s ``__name__`` will also be included in the call list. (default: ``False``)
results (bool):
If ``True`` the results (and exceptions) will also be included in the call list. (default: ``False``)
Returns:
A wrapper that records all calls made to `func`. The history is available as a ``call``
property. If access to the function is too hard then you need to specify the history manually.
Example:
>>> @record
... def a(x, y, a, b):
... pass
>>> a(1, 2, 3, b='c')
>>> a.calls
[Call(self=None, args=(1, 2, 3), kwargs={'b': 'c'})]
Or, with your own history list::
>>> calls = []
>>> @record(calls=calls)
... def a(x, y, a, b):
... pass
>>> a(1, 2, 3, b='c')
>>> a.calls
[Call(self=None, args=(1, 2, 3), kwargs={'b': 'c'})]
>>> calls is a.calls
True
.. versionchanged:: 0.9.0
Renamed `history` option to `calls`.
Renamed `call` option to `iscalled`.
Added `callback` option.
Added `extended` option.
"""
if func:
return _RecordingFunctionWrapper(func, recurse_lock=recurse_lock_factory(), **options)
else:
return partial(record, **options)
class StoryResultWrapper(object):
__slots__ = '__recorder__'
def __init__(self, recorder):
self.__recorder__ = recorder
def __eq__(self, result):
self.__recorder__(_Returns(result))
def __pow__(self, exception):
if not (isinstance(exception, BaseException) or isclass(exception) and issubclass(exception, BaseException)):
raise RuntimeError("Value %r must be an exception type or instance." % exception)
self.__recorder__(_Raises(exception))
def __unsupported__(self, *args):
raise TypeError("Unsupported operation. Only `==` (for results) and `**` (for exceptions) can be used.")
for mm in (
'__add__',
'__sub__',
'__mul__',
'__floordiv__',
'__mod__',
'__divmod__',
'__lshift__',
'__rshift__',
'__and__',
'__xor__',
'__or__',
'__div__',
'__truediv__',
'__radd__',
'__rsub__',
'__rmul__',
'__rdiv__',
'__rtruediv__',
'__rfloordiv__',
'__rmod__',
'__rdivmod__',
'__rpow__',
'__rlshift__',
'__rrshift__',
'__rand__',
'__rxor__',
'__ror__',
'__iadd__',
'__isub__',
'__imul__',
'__idiv__',
'__itruediv__',
'__ifloordiv__',
'__imod__',
'__ipow__',
'__ilshift__',
'__irshift__',
'__iand__',
'__ixor__',
'__ior__',
'__neg__',
'__pos__',
'__abs__',
'__invert__',
'__complex__',
'__int__',
'__long__',
'__float__',
'__oct__',
'__hex__',
'__index__',
'__coerce__',
'__getslice__',
'__setslice__',
'__delslice__',
'__len__',
'__getitem__',
'__reversed__',
'__contains__',
'__call__',
'__lt__',
'__le__',
'__ne__',
'__gt__',
'__ge__',
'__cmp__',
'__rcmp__',
'__nonzero__',
):
exec("%s = __unsupported__" % mm)
class _StoryFunctionWrapper(object):
def __init__(self, wrapped, handle, binding=None, owner=None):
self._wrapped = wrapped
self._name = wrapped.__name__
self._handle = handle
self._binding = binding
self._owner = owner
@property
def _qualname(self):
return qualname(self)
def __call__(self, *args, **kwargs):
if self._binding is None:
return StoryResultWrapper(partial(self._handle, None, self._qualname, args, kwargs))
else:
if self._name == '__init__':
self._handle(None, qualname(self._owner), args, kwargs, _Binds(self._binding))
else:
return StoryResultWrapper(partial(self._handle, self._binding, self._name, args, kwargs))
def __get__(self, binding, owner):
return mimic(
type(self)(
self._wrapped.__get__(binding, owner) if hasattr(self._wrapped, '__get__') else self._wrapped,
handle=self._handle,
binding=binding,
owner=owner,
),
self,
)
class _ReplayFunctionWrapper(_StoryFunctionWrapper):
def __call__(self, *args, **kwargs):
if self._binding is None:
return self._handle(None, self._qualname, args, kwargs, self._wrapped)
else:
if self._name == '__init__':
self._handle(None, qualname(self._owner), args, kwargs, self._wrapped, _Binds(self._binding))
else:
return self._handle(self._binding, self._name, args, kwargs, self._wrapped)
class _RecordingBase(object):
_target = None
_options = None
def __init__(self, target, **options):
self._target = target
self._options = options
self._calls = OrderedDict()
self._ids = {}
self._instances = defaultdict(int)
def _make_key(self, binding, name, args, kwargs):
if binding is not None:
binding, _ = self._ids[id(binding)]
return (binding, name, ', '.join(repr_ex(i) for i in args), ', '.join("%s=%s" % (k, repr_ex(v)) for k, v in kwargs.items()))
def _tag_result(self, name, result):
if isinstance(result, _Binds):
instance_name = camelcase_to_underscores(name.rsplit('.', 1)[-1])
self._instances[instance_name] += 1
instance_name = "%s_%s" % (instance_name, self._instances[instance_name])
self._ids[id(result.value)] = instance_name, result.value
result.value = instance_name
else:
result.value = repr_ex(result.value, self._ids)
return result
def _handle(self, binding, name, args, kwargs, result):
pk = self._make_key(binding, name, args, kwargs)
result = self._tag_result(name, result)
assert (
pk not in self._calls or self._calls[pk] == result
), "Story creation inconsistency. There is already a result cached for " "binding:%r name:%r args:%r kwargs:%r and it's: %r." % (
binding,
name,
args,
kwargs,
self._calls[pk],
)
self._calls[pk] = result
def __enter__(self):
self._options.setdefault('methods', ALL_METHODS)
self.__entanglement = weave(self._target, partial(self._FunctionWrapper, handle=self._handle), **self._options)
return self
def __exit__(self, *args):
self.__entanglement.rollback()
del self._ids
_Raises = container("Raises")
_Returns = container("Returns")
_Binds = container("Binds")
[docs]
class Story(_RecordingBase):
"""
This a simple yet flexible tool that can do "capture-replay mocking" or "test doubles" [1]_. It leverages
``aspectlib``'s powerful :obj:`weaver <aspectlib.weave>`.
Args:
target (same as for :obj:`aspectlib.weave`):
Targets to weave in the `story`/`replay` transactions.
subclasses (bool):
If ``True``, subclasses of target are weaved. *Only available for classes*
aliases (bool):
If ``True``, aliases of target are replaced.
lazy (bool):
If ``True`` only target's ``__init__`` method is patched, the rest of the methods are patched after ``__init__``
is called. *Only available for classes*.
methods (list or regex or string): Methods from target to patch. *Only available for classes*
The ``Story`` allows some testing patterns that are hard to do with other tools:
* **Proxied mocks**: partially mock `objects` and `modules` so they are called normally if the request is unknown.
* **Stubs**: completely mock `objects` and `modules`. Raise errors if the request is unknown.
The ``Story`` works in two of transactions:
* **The story**: You describe what calls you want to mocked. Initially you don't need to write this. Example:
::
>>> import mymod
>>> with Story(mymod) as story:
... mymod.func('some arg') == 'some result'
... mymod.func('bad arg') ** ValueError("can't use this")
* **The replay**: You run the code uses the interfaces mocked in the `story`. The :obj:`replay
<aspectlib.test.Story.replay>` always starts from a `story` instance.
.. versionchanged:: 0.9.0
Added in.
.. [1] http://www.martinfowler.com/bliki/TestDouble.html
"""
_FunctionWrapper = _StoryFunctionWrapper
def __init__(self, *args, **kwargs):
super(Story, self).__init__(*args, **kwargs)
frame = _getframe(1)
self._context = frame.f_globals, frame.f_locals
[docs]
def replay(self, **options):
"""
Args:
proxy (bool):
If ``True`` then unexpected uses are allowed (will use the real functions) but they are collected for later
use. Default: ``True``.
strict (bool):
If ``True`` then an ``AssertionError`` is raised when there were `unexpected calls` or there were `missing
calls` (specified in the story but not called). Default: ``True``.
dump (bool):
If ``True`` then the `unexpected`/`missing calls` will be printed (to ``sys.stdout``). Default: ``True``.
Returns:
A :obj:`aspectlib.test.Replay` object.
Example:
>>> import mymod
>>> with Story(mymod) as story:
... mymod.func('some arg') == 'some result'
... mymod.func('other arg') == 'other result'
>>> with story.replay(strict=False):
... print(mymod.func('some arg'))
... mymod.func('bogus arg')
some result
Got bogus arg in the real code!
STORY/REPLAY DIFF:
--- expected...
+++ actual...
@@ -1,2 +1,2 @@
mymod.func('some arg') == 'some result' # returns
-mymod.func('other arg') == 'other result' # returns
+mymod.func('bogus arg') == None # returns
ACTUAL:
mymod.func('some arg') == 'some result' # returns
mymod.func('bogus arg') == None # returns
<BLANKLINE>
"""
options.update(self._options)
return Replay(self, **options)
ReplayPair = namedtuple("ReplayPair", ('expected', 'actual'))
def logged_eval(value, context):
try:
return eval(value, *context)
except: # noqa
logexception("Failed to evaluate %r.\nContext:\n%s", value, ''.join(format_stack(f=_getframe(1), limit=15)))
raise
[docs]
class Replay(_RecordingBase):
"""
Object implementing the `replay transaction`.
This object should be created by :obj:`Story <aspectlib.test.Story>`'s :obj:`replay <aspectlib.test.Story.replay>`
method.
"""
_FunctionWrapper = _ReplayFunctionWrapper
def __init__(self, play, proxy=True, strict=True, dump=True, recurse_lock=False, **options):
super(Replay, self).__init__(play._target, **options)
self._calls, self._expected, self._actual = ChainMap(self._calls, play._calls), play._calls, self._calls
self._proxy = proxy
self._strict = strict
self._dump = dump
self._context = play._context
self._recurse_lock = allocate_lock() if recurse_lock is True else (recurse_lock and recurse_lock())
def _handle(self, binding, name, args, kwargs, wrapped, bind=None):
pk = self._make_key(binding, name, args, kwargs)
if pk in self._expected:
result = self._actual[pk] = self._expected[pk]
if isinstance(result, _Binds):
self._tag_result(name, bind)
elif isinstance(result, _Returns):
return logged_eval(result.value, self._context)
elif isinstance(result, _Raises):
raise logged_eval(result.value, self._context)
else:
raise RuntimeError('Internal failure - unknown result: %r' % result) # pragma: no cover
else:
if self._proxy:
shouldrecord = not self._recurse_lock or self._recurse_lock.acquire(False)
try:
try:
if bind:
bind = self._tag_result(name, bind)
result = wrapped(*args, **kwargs)
except Exception as exc:
if shouldrecord:
self._calls[pk] = self._tag_result(name, _Raises(exc))
raise
else:
if shouldrecord:
self._calls[pk] = bind or self._tag_result(name, _Returns(result))
return result
finally:
if shouldrecord and self._recurse_lock:
self._recurse_lock.release()
else:
raise AssertionError("Unexpected call to %s/%s with args:%s kwargs:%s" % pk)
def _unexpected(self, _missing=False):
if _missing:
expected, actual = self._actual, self._expected
else:
actual, expected = self._actual, self._expected
return ''.join(_format_calls(OrderedDict((pk, val) for pk, val in actual.items() if pk not in expected or val != expected.get(pk))))
@property
def unexpected(self):
"""
Returns a pretty text representation of just the unexpected calls.
The output should be usable directly in the story (just copy-paste it). Example::
>>> import mymod
>>> with Story(mymod) as story:
... pass
>>> with story.replay(strict=False, dump=False) as replay:
... mymod.func('some arg')
... try:
... mymod.badfunc()
... except ValueError as exc:
... print(exc)
Got some arg in the real code!
boom!
>>> print(replay.unexpected)
mymod.func('some arg') == None # returns
mymod.badfunc() ** ValueError('boom!',) # raises
<BLANKLINE>
We can just take the output and paste in the story::
>>> import mymod
>>> with Story(mymod) as story:
... mymod.func('some arg') == None # returns
... mymod.badfunc() ** ValueError('boom!') # raises
>>> with story.replay():
... mymod.func('some arg')
... try:
... mymod.badfunc()
... except ValueError as exc:
... print(exc)
boom!
"""
return self._unexpected()
@property
def missing(self):
"""
Returns a pretty text representation of just the missing calls.
"""
return self._unexpected(_missing=True)
@property
def diff(self):
"""
Returns a pretty text representation of the unexpected and missing calls.
Most of the time you don't need to directly use this. This is useful when you run the `replay` in
``strict=False`` mode and want to do custom assertions.
"""
actual = list(_format_calls(self._actual))
expected = list(_format_calls(self._expected))
return ''.join(unified_diff(expected, actual, fromfile='expected', tofile='actual'))
@property
def actual(self):
return ''.join(_format_calls(self._actual))
@property
def expected(self):
return ''.join(_format_calls(self._expected))
def __exit__(self, *exception):
super(Replay, self).__exit__()
if self._strict or self._dump:
diff = self.diff
if diff:
if exception or self._dump:
print('STORY/REPLAY DIFF:')
print(' ' + '\n '.join(diff.splitlines()))
print('ACTUAL:')
print(' ' + ' '.join(_format_calls(self._actual)))
if not exception and self._strict:
raise AssertionError(diff)
def _format_calls(calls):
for (binding, name, args, kwargs), result in calls.items():
sig = '%s(%s%s%s)' % (name, args, ', ' if kwargs and args else '', kwargs)
if isinstance(result, _Binds):
yield '%s = %s\n' % (result.value, sig)
elif isinstance(result, _Returns):
if binding is None:
yield '%s == %s # returns\n' % (sig, result.value)
else:
yield '%s.%s == %s # returns\n' % (binding, sig, result.value)
elif isinstance(result, _Raises):
if binding is None:
yield '%s ** %s # raises\n' % (sig, result.value)
else:
yield '%s.%s ** %s # raises\n' % (binding, sig, result.value)