Source code for pyramid.testing

from contextlib import contextmanager
import copy
import os
from webob.acceptparse import create_accept_header
from zope.interface import alsoProvides, implementer

from pyramid.config import Configurator
from pyramid.decorator import reify
from pyramid.i18n import LocalizerRequestMixin
from pyramid.interfaces import IRequest, ISession
from pyramid.path import caller_package
from pyramid.registry import Registry
from pyramid.request import CallbackMethodsMixin
from pyramid.response import _get_response_factory
from pyramid.security import AuthenticationAPIMixin, SecurityAPIMixin
from pyramid.threadlocal import get_current_registry, manager
from pyramid.url import URLMethodsMixin
from pyramid.util import PYPY, InstancePropertyMixin
from pyramid.view import ViewMethodsMixin

_marker = object()


class DummyRootFactory:
    __parent__ = None
    __name__ = None

    def __init__(self, request):
        if 'bfg.routes.matchdict' in request:
            self.__dict__.update(request['bfg.routes.matchdict'])


class DummySecurityPolicy:
    """ A standin for a :term:`security policy`."""

    def __init__(
        self,
        userid=None,
        identity=None,
        permissive=True,
        remember_result=None,
        forget_result=None,
    ):
        self.userid = userid
        self._identity = identity
        self.permissive = permissive
        if remember_result is None:
            remember_result = []
        if forget_result is None:
            forget_result = []
        self.remember_result = remember_result
        self.forget_result = forget_result

    def identity(self, request):
        return self._identity

    def authenticated_userid(self, request):
        return self.userid

    def permits(self, request, context, permission):
        return self.permissive

    def remember(self, request, userid, **kw):
        self.remembered = userid
        return self.remember_result

    def forget(self, request, **kw):
        self.forgotten = True
        return self.forget_result


[docs] class DummyTemplateRenderer: """ An instance of this class is returned from :meth:`pyramid.config.Configurator.testing_add_renderer`. It has a helper function (``assert_``) that makes it possible to make an assertion which compares data passed to the renderer by the view function against expected key/value pairs. """ def __init__(self, string_response=''): self._received = {} self._string_response = string_response self._implementation = MockTemplate(string_response) # For in-the-wild test code that doesn't create its own renderer, # but mutates our internals instead. When all you read is the # source code, *everything* is an API! def _get_string_response(self): return self._string_response def _set_string_response(self, response): self._string_response = response self._implementation.response = response string_response = property(_get_string_response, _set_string_response) def implementation(self): return self._implementation def __call__(self, kw, system=None): if system: self._received.update(system) self._received.update(kw) return self.string_response def __getattr__(self, k): """ Backwards compatibility """ val = self._received.get(k, _marker) if val is _marker: val = self._implementation._received.get(k, _marker) if val is _marker: raise AttributeError(k) return val
[docs] def assert_(self, **kw): """Accept an arbitrary set of assertion key/value pairs. For each assertion key/value pair assert that the renderer (eg. :func:`pyramid.renderers.render_to_response`) received the key with a value that equals the asserted value. If the renderer did not receive the key at all, or the value received by the renderer doesn't match the assertion value, raise an :exc:`AssertionError`.""" for k, v in kw.items(): myval = self._received.get(k, _marker) if myval is _marker: myval = self._implementation._received.get(k, _marker) if myval is _marker: raise AssertionError( 'A value for key "%s" was not passed to the renderer' % k ) if myval != v: raise AssertionError( '\nasserted value for %s: %r\nactual value: %r' % (k, v, myval) ) return True
[docs] class DummyResource: """ A dummy :app:`Pyramid` :term:`resource` object.""" def __init__( self, __name__=None, __parent__=None, __provides__=None, **kw ): """The resource's ``__name__`` attribute will be set to the value of the ``__name__`` argument, and the resource's ``__parent__`` attribute will be set to the value of the ``__parent__`` argument. If ``__provides__`` is specified, it should be an interface object or tuple of interface objects that will be attached to the resulting resource via :func:`zope.interface.alsoProvides`. Any extra keywords passed in the ``kw`` argument will be set as direct attributes of the resource object. .. note:: For backwards compatibility purposes, this class can also be imported as :class:`pyramid.testing.DummyModel`. """ self.__name__ = __name__ self.__parent__ = __parent__ if __provides__ is not None: alsoProvides(self, __provides__) self.kw = kw self.__dict__.update(**kw) self.subs = {} def __setitem__(self, name, val): """When the ``__setitem__`` method is called, the object passed in as ``val`` will be decorated with a ``__parent__`` attribute pointing at the dummy resource and a ``__name__`` attribute that is the value of ``name``. The value will then be returned when dummy resource's ``__getitem__`` is called with the name ``name```.""" val.__name__ = name val.__parent__ = self self.subs[name] = val def __getitem__(self, name): """ Return a named subobject (see ``__setitem__``)""" ob = self.subs[name] return ob def __delitem__(self, name): del self.subs[name] def get(self, name, default=None): return self.subs.get(name, default)
[docs] def values(self): """ Return the values set by __setitem__ """ return self.subs.values()
[docs] def items(self): """ Return the items set by __setitem__ """ return self.subs.items()
[docs] def keys(self): """ Return the keys set by __setitem__ """ return self.subs.keys()
__iter__ = keys def __bool__(self): return True def __len__(self): return len(self.subs) def __contains__(self, name): return name in self.subs
[docs] def clone(self, __name__=_marker, __parent__=_marker, **kw): """Create a clone of the resource object. If ``__name__`` or ``__parent__`` arguments are passed, use these values to override the existing ``__name__`` or ``__parent__`` of the resource. If any extra keyword args are passed in via the ``kw`` argument, use these keywords to add to or override existing resource keywords (attributes).""" oldkw = self.kw.copy() oldkw.update(kw) inst = self.__class__(self.__name__, self.__parent__, **oldkw) inst.subs = copy.deepcopy(self.subs) if __name__ is not _marker: inst.__name__ = __name__ if __parent__ is not _marker: inst.__parent__ = __parent__ return inst
DummyModel = DummyResource # b/w compat (forever) @implementer(ISession) class DummySession(dict): created = None new = True def changed(self): pass def invalidate(self): self.clear() def flash(self, msg, queue='', allow_duplicate=True): storage = self.setdefault('_f_' + queue, []) if allow_duplicate or (msg not in storage): storage.append(msg) def pop_flash(self, queue=''): storage = self.pop('_f_' + queue, []) return storage def peek_flash(self, queue=''): storage = self.get('_f_' + queue, []) return storage def new_csrf_token(self): token = '0123456789012345678901234567890123456789' self['_csrft_'] = token return token def get_csrf_token(self): token = self.get('_csrft_', None) if token is None: token = self.new_csrf_token() return token
[docs] @implementer(IRequest) class DummyRequest( URLMethodsMixin, CallbackMethodsMixin, InstancePropertyMixin, LocalizerRequestMixin, SecurityAPIMixin, AuthenticationAPIMixin, ViewMethodsMixin, ): """A DummyRequest object (incompletely) imitates a :term:`request` object. The ``params``, ``environ``, ``headers``, ``path``, and ``cookies`` arguments correspond to their :term:`WebOb` equivalents. The ``post`` argument, if passed, populates the request's ``POST`` attribute, but *not* ``params``, in order to allow testing that the app accepts data for a given view only from POST requests. This argument also sets ``self.method`` to "POST". Extra keyword arguments are assigned as attributes of the request itself. Note that DummyRequest does not have complete fidelity with a "real" request. For example, by default, the DummyRequest ``GET`` and ``POST`` attributes are of type ``dict``, unlike a normal Request's GET and POST, which are of type ``MultiDict``. If your code uses the features of MultiDict, you should either use a real :class:`pyramid.request.Request` or adapt your DummyRequest by replacing the attributes with ``MultiDict`` instances. Other similar incompatibilities exist. If you need all the features of a Request, use the :class:`pyramid.request.Request` class itself rather than this class while writing tests. """ method = 'GET' application_url = 'http://example.com' host = 'example.com:80' domain = 'example.com' content_length = 0 query_string = '' charset = 'UTF-8' script_name = '' _registry = None _accept = None request_iface = IRequest def __init__( self, params=None, environ=None, headers=None, path='/', cookies=None, post=None, accept=None, **kw ): if environ is None: environ = {} if params is None: params = {} if headers is None: headers = {} if cookies is None: cookies = {} self.environ = environ self.headers = headers self.params = params self.cookies = cookies self.matchdict = {} self.GET = params if post is not None: self.method = 'POST' self.POST = post else: self.POST = params self.host_url = self.application_url self.path_url = self.application_url self.url = self.application_url self.path = path self.path_info = path self.script_name = '' self.path_qs = '' self.body = '' self.view_name = '' self.subpath = () self.traversed = () self.virtual_root_path = () self.context = None self.root = None self.virtual_root = None self.marshalled = params # repoze.monty self.session = DummySession() self.accept = accept self.__dict__.update(kw) def _get_registry(self): if self._registry is None: return get_current_registry() return self._registry def _set_registry(self, registry): self._registry = registry def _del_registry(self): self._registry = None registry = property(_get_registry, _set_registry, _del_registry) def _set_accept(self, value): self._accept = create_accept_header(value) def _get_accept(self): if self._accept is None: self._accept = create_accept_header(None) return self._accept def _del_accept(self): self._accept = None accept = property(_get_accept, _set_accept, _del_accept) @reify def response(self): f = _get_response_factory(self.registry) return f(self)
have_zca = True
[docs] def setUp( registry=None, request=None, hook_zca=True, autocommit=True, settings=None, package=None, ): """ Set :app:`Pyramid` registry and request thread locals for the duration of a single unit test. Use this function in the ``setUp`` method of a unittest test case which directly or indirectly uses: - any method of the :class:`pyramid.config.Configurator` object returned by this function. - the :func:`pyramid.threadlocal.get_current_registry` or :func:`pyramid.threadlocal.get_current_request` functions. If you use the ``get_current_*`` functions (or call :app:`Pyramid` code that uses these functions) without calling ``setUp``, :func:`pyramid.threadlocal.get_current_registry` will return a *global* :term:`application registry`, which may cause unit tests to not be isolated with respect to registrations they perform. If the ``registry`` argument is ``None``, a new empty :term:`application registry` will be created (an instance of the :class:`pyramid.registry.Registry` class). If the ``registry`` argument is not ``None``, the value passed in should be an instance of the :class:`pyramid.registry.Registry` class or a suitable testing analogue. After ``setUp`` is finished, the registry returned by the :func:`pyramid.threadlocal.get_current_registry` function will be the passed (or constructed) registry until :func:`pyramid.testing.tearDown` is called (or :func:`pyramid.testing.setUp` is called again) . If the ``hook_zca`` argument is ``True``, ``setUp`` will attempt to perform the operation ``zope.component.getSiteManager.sethook( pyramid.threadlocal.get_current_registry)``, which will cause the :term:`Zope Component Architecture` global API (e.g. :func:`zope.component.getSiteManager`, :func:`zope.component.getAdapter`, and so on) to use the registry constructed by ``setUp`` as the value it returns from :func:`zope.component.getSiteManager`. If the :mod:`zope.component` package cannot be imported, or if ``hook_zca`` is ``False``, the hook will not be set. If ``settings`` is not ``None``, it must be a dictionary representing the values passed to a Configurator as its ``settings=`` argument. If ``package`` is ``None`` it will be set to the caller's package. The ``package`` setting in the :class:`pyramid.config.Configurator` will affect any relative imports made via :meth:`pyramid.config.Configurator.include` or :meth:`pyramid.config.Configurator.maybe_dotted`. This function returns an instance of the :class:`pyramid.config.Configurator` class, which can be used for further configuration to set up an environment suitable for a unit or integration test. The ``registry`` attribute attached to the Configurator instance represents the 'current' :term:`application registry`; the same registry will be returned by :func:`pyramid.threadlocal.get_current_registry` during the execution of the test. """ manager.clear() if registry is None: registry = Registry('testing') if package is None: package = caller_package() config = Configurator( registry=registry, autocommit=autocommit, package=package ) if settings is None: settings = {} config._fix_registry() if getattr(registry, 'settings', None) is None: config._set_settings(settings) if hasattr(registry, 'registerUtility'): # Sometimes nose calls us with a non-registry object because # it thinks this function is module test setup. Likewise, # someone may be passing us an esoteric "dummy" registry, and # the below won't succeed if it doesn't have a registerUtility # method. config.add_default_response_adapters() config.add_default_renderers() config.add_default_accept_view_order() config.add_default_view_predicates() config.add_default_view_derivers() config.add_default_route_predicates() config.add_default_tweens() config.add_default_security() config.commit() global have_zca try: have_zca and hook_zca and config.hook_zca() except ImportError: # pragma: no cover # (dont choke on not being able to import z.component) have_zca = False config.begin(request=request) return config
[docs] def tearDown(unhook_zca=True): """Undo the effects of :func:`pyramid.testing.setUp`. Use this function in the ``tearDown`` method of a unit test that uses :func:`pyramid.testing.setUp` in its ``setUp`` method. If the ``unhook_zca`` argument is ``True`` (the default), call :func:`zope.component.getSiteManager.reset`. This undoes the action of :func:`pyramid.testing.setUp` when called with the argument ``hook_zca=True``. If :mod:`zope.component` cannot be imported, ``unhook_zca`` is set to ``False``. """ global have_zca if unhook_zca and have_zca: try: from zope.component import getSiteManager getSiteManager.reset() except ImportError: # pragma: no cover have_zca = False info = manager.pop() manager.clear() if info is not None: registry = info['registry'] if hasattr(registry, '__init__') and hasattr(registry, '__name__'): try: registry.__init__(registry.__name__) except TypeError: # calling __init__ is largely for the benefit of # people who want to use the global ZCA registry; # however maybe somebody's using a registry we don't # understand, let's not blow up pass
[docs] def cleanUp(*arg, **kw): """ An alias for :func:`pyramid.testing.setUp`. """ package = kw.get('package', None) if package is None: package = caller_package() kw['package'] = package return setUp(*arg, **kw)
class DummyRendererFactory: """Registered by :meth:`pyramid.config.Configurator.testing_add_renderer` as a dummy renderer factory. The indecision about what to use as a key (a spec vs. a relative name) is caused by test suites in the wild believing they can register either. The ``factory`` argument passed to this constructor is usually the *real* template renderer factory, found when ``testing_add_renderer`` is called.""" def __init__(self, name, factory): self.name = name self.factory = factory # the "real" renderer factory reg'd previously self.renderers = {} def add(self, spec, renderer): self.renderers[spec] = renderer if ':' in spec: package, relative = spec.split(':', 1) self.renderers[relative] = renderer def __call__(self, info): spec = info.name renderer = self.renderers.get(spec) if renderer is None: if ':' in spec: package, relative = spec.split(':', 1) renderer = self.renderers.get(relative) if renderer is None: if self.factory: renderer = self.factory(info) else: raise KeyError( 'No testing renderer registered for %r' % spec ) return renderer class MockTemplate: def __init__(self, response): self._received = {} self.response = response def __getattr__(self, attrname): return self def __getitem__(self, attrname): return self def __call__(self, *arg, **kw): self._received.update(kw) return self.response def skip_on(*platforms): # pragma: no cover skip = False for platform in platforms: if skip_on.os_name.startswith(platform): skip = True if platform == 'pypy' and PYPY: skip = True def decorator(func): if isinstance(func, type): if skip: return None else: return func else: def wrapper(*args, **kw): if skip: return return func(*args, **kw) wrapper.__name__ = func.__name__ wrapper.__doc__ = func.__doc__ return wrapper return decorator skip_on.os_name = os.name # for testing
[docs] @contextmanager def testConfig( registry=None, request=None, hook_zca=True, autocommit=True, settings=None ): """Returns a context manager for test set up. This context manager calls :func:`pyramid.testing.setUp` when entering and :func:`pyramid.testing.tearDown` when exiting. All arguments are passed directly to :func:`pyramid.testing.setUp`. If the ZCA is hooked, it will always be un-hooked in tearDown. This context manager allows you to write test code like this: .. code-block:: python :linenos: with testConfig() as config: config.add_route('bar', '/bar/{id}') req = DummyRequest() resp = myview(req) """ config = setUp( registry=registry, request=request, hook_zca=hook_zca, autocommit=autocommit, settings=settings, ) try: yield config finally: tearDown(unhook_zca=hook_zca)