Source code for pyramid.csrf

from urllib.parse import urlparse
import uuid
from webob.cookies import CookieProfile
from zope.interface import implementer

from pyramid.exceptions import BadCSRFOrigin, BadCSRFToken
from pyramid.interfaces import ICSRFStoragePolicy
from pyramid.settings import aslist
from pyramid.util import (
    SimpleSerializer,
    bytes_,
    is_same_domain,
    strings_differ,
    text_,
)


[docs] @implementer(ICSRFStoragePolicy) class LegacySessionCSRFStoragePolicy: """A CSRF storage policy that defers control of CSRF storage to the session. This policy maintains compatibility with legacy ISession implementations that know how to manage CSRF tokens themselves via ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``. Note that using this CSRF implementation requires that a :term:`session factory` is configured. .. versionadded:: 1.9 """
[docs] def new_csrf_token(self, request): """ Sets a new CSRF token into the session and returns it. """ return request.session.new_csrf_token()
[docs] def get_csrf_token(self, request): """Returns the currently active CSRF token from the session, generating a new one if needed.""" return request.session.get_csrf_token()
[docs] def check_csrf_token(self, request, supplied_token): """ Returns ``True`` if the ``supplied_token`` is valid.""" expected_token = self.get_csrf_token(request) return not strings_differ( bytes_(expected_token), bytes_(supplied_token) )
[docs] @implementer(ICSRFStoragePolicy) class SessionCSRFStoragePolicy: """A CSRF storage policy that persists the CSRF token in the session. Note that using this CSRF implementation requires that a :term:`session factory` is configured. ``key`` The session key where the CSRF token will be stored. Default: `_csrft_`. .. versionadded:: 1.9 """ _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) def __init__(self, key='_csrft_'): self.key = key
[docs] def new_csrf_token(self, request): """ Sets a new CSRF token into the session and returns it. """ token = self._token_factory() request.session[self.key] = token return token
[docs] def get_csrf_token(self, request): """Returns the currently active CSRF token from the session, generating a new one if needed.""" token = request.session.get(self.key, None) if not token: token = self.new_csrf_token(request) return token
[docs] def check_csrf_token(self, request, supplied_token): """ Returns ``True`` if the ``supplied_token`` is valid.""" expected_token = self.get_csrf_token(request) return not strings_differ( bytes_(expected_token), bytes_(supplied_token) )
[docs] @implementer(ICSRFStoragePolicy) class CookieCSRFStoragePolicy: """An alternative CSRF implementation that stores its information in unauthenticated cookies, known as the 'Double Submit Cookie' method in the `OWASP CSRF guidelines <https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie>`_. This gives some additional flexibility with regards to scaling as the tokens can be generated and verified by a front-end server. .. versionadded:: 1.9 .. versionchanged: 1.10 Added the ``samesite`` option and made the default ``'Lax'``. """ _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) def __init__( self, cookie_name='csrf_token', secure=False, httponly=False, domain=None, max_age=None, path='/', samesite='Lax', ): self.cookie_profile = CookieProfile( cookie_name=cookie_name, secure=secure, max_age=max_age, httponly=httponly, path=path, domains=[domain], serializer=SimpleSerializer(), samesite=samesite, ) self.cookie_name = cookie_name
[docs] def new_csrf_token(self, request): """ Sets a new CSRF token into the request and returns it. """ token = self._token_factory() request.cookies[self.cookie_name] = token def set_cookie(request, response): self.cookie_profile.set_cookies(response, token) request.add_response_callback(set_cookie) return token
[docs] def get_csrf_token(self, request): """Returns the currently active CSRF token by checking the cookies sent with the current request.""" bound_cookies = self.cookie_profile.bind(request) token = bound_cookies.get_value() if not token: token = self.new_csrf_token(request) return token
[docs] def check_csrf_token(self, request, supplied_token): """ Returns ``True`` if the ``supplied_token`` is valid.""" expected_token = self.get_csrf_token(request) return not strings_differ( bytes_(expected_token), bytes_(supplied_token) )
[docs] def get_csrf_token(request): """Get the currently active CSRF token for the request passed, generating a new one using ``new_csrf_token(request)`` if one does not exist. This calls the equivalent method in the chosen CSRF protection implementation. .. versionadded :: 1.9 """ registry = request.registry csrf = registry.getUtility(ICSRFStoragePolicy) return csrf.get_csrf_token(request)
[docs] def new_csrf_token(request): """Generate a new CSRF token for the request passed and persist it in an implementation defined manner. This calls the equivalent method in the chosen CSRF protection implementation. .. versionadded :: 1.9 """ registry = request.registry csrf = registry.getUtility(ICSRFStoragePolicy) return csrf.new_csrf_token(request)
[docs] def check_csrf_token( request, token='csrf_token', header='X-CSRF-Token', raises=True ): """Check the CSRF token returned by the :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the value in ``request.POST.get(token)`` (if a POST request) or ``request.headers.get(header)``. If a ``token`` keyword is not supplied to this function, the string ``csrf_token`` will be used to look up the token in ``request.POST``. If a ``header`` keyword is not supplied to this function, the string ``X-CSRF-Token`` will be used to look up the token in ``request.headers``. If the value supplied by post or by header cannot be verified by the :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is ``True``, this function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ and ``raises`` is ``False``, this function will return ``False``. If the CSRF check is successful, this function will return ``True`` unconditionally. See :ref:`auto_csrf_checking` for information about how to secure your application automatically against CSRF attacks. .. versionadded:: 1.4a2 .. versionchanged:: 1.7a1 A CSRF token passed in the query string of the request is no longer considered valid. It must be passed in either the request body or a header. .. versionchanged:: 1.9 Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to verify the CSRF token. """ supplied_token = "" # We first check the headers for a csrf token, as that is significantly # cheaper than checking the POST body if header is not None: supplied_token = request.headers.get(header, "") # If this is a POST/PUT/etc request, then we'll check the body to see if it # has a token. We explicitly use request.POST here because CSRF tokens # should never appear in an URL as doing so is a security issue. We also # explicitly check for request.POST here as we do not support sending form # encoded data over anything but a request.POST. if supplied_token == "" and token is not None: supplied_token = request.POST.get(token, "") policy = request.registry.getUtility(ICSRFStoragePolicy) if not policy.check_csrf_token(request, text_(supplied_token)): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') return False return True
[docs] def check_csrf_origin( request, *, trusted_origins=None, allow_no_origin=False, raises=True ): """ Check the ``Origin`` of the request to see if it is a cross site request or not. If the value supplied by the ``Origin`` or ``Referer`` header isn't one of the trusted origins and ``raises`` is ``True``, this function will raise a :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is ``False``, this function will return ``False`` instead. If the CSRF origin checks are successful this function will return ``True`` unconditionally. Additional trusted origins may be added by passing a list of domain (and ports if non-standard like ``['example.com', 'dev.example.com:8080']``) in with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` (the default) this list of additional domains will be pulled from the ``pyramid.csrf_trusted_origins`` setting. ``allow_no_origin`` determines whether to return ``True`` when the origin cannot be determined via either the ``Referer`` or ``Origin`` header. The default is ``False`` which will reject the check. Note that this function will do nothing if ``request.scheme`` is not ``https``. .. versionadded:: 1.7 .. versionchanged:: 1.9 Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` .. versionchanged:: 2.0 Added the ``allow_no_origin`` option. """ def _fail(reason): if raises: raise BadCSRFOrigin("Origin checking failed - " + reason) else: return False # Origin checks are only trustworthy / useful on HTTPS requests. if request.scheme != "https": return True # Suppose user visits http://example.com/ # An active network attacker (man-in-the-middle, MITM) sends a # POST form that targets https://example.com/detonate-bomb/ and # submits it via JavaScript. # # The attacker will need to provide a CSRF cookie and token, but # that's no problem for a MITM when we cannot make any assumptions # about what kind of session storage is being used. So the MITM can # circumvent the CSRF protection. This is true for any HTTP connection, # but anyone using HTTPS expects better! For this reason, for # https://example.com/ we need additional protection that treats # http://example.com/ as completely untrusted. Under HTTPS, # Barth et al. found that the Referer header is missing for # same-domain requests in only about 0.2% of cases or less, so # we can use strict Referer checking. # Determine the origin of this request origin = request.headers.get("Origin") origin_is_referrer = False if origin is None: origin = request.referrer origin_is_referrer = True else: # use the last origin in the list under the assumption that the # server generally appends values and we want the origin closest # to us origin = origin.split(' ')[-1] # If we can't find an origin, fail or pass immediately depending on # ``allow_no_origin`` if not origin: if allow_no_origin: return True else: return _fail("missing Origin or Referer.") # Determine which origins we trust, which by default will include the # current origin. if trusted_origins is None: trusted_origins = aslist( request.registry.settings.get("pyramid.csrf_trusted_origins", []) ) if request.host_port not in {"80", "443"}: trusted_origins.append("{0.domain}:{0.host_port}".format(request)) else: trusted_origins.append(request.domain) # Check "Origin: null" against trusted_origins if not origin_is_referrer and origin == 'null': if origin in trusted_origins: return True else: return _fail("null does not match any trusted origins.") # Parse our origin so we we can extract the required information from # it. originp = urlparse(origin) # Ensure that our Referer is also secure. if originp.scheme != "https": return _fail("Origin is insecure while host is secure.") # Actually check to see if the request's origin matches any of our # trusted origins. if not any( is_same_domain(originp.netloc, host) for host in trusted_origins ): return _fail("{} does not match any trusted origins.".format(origin)) return True