import base64
import binascii
from codecs import utf_8_decode, utf_8_encode
from collections import namedtuple
import hashlib
import re
import time as time_mod
from urllib.parse import quote, unquote
import warnings
from webob.cookies import CookieProfile
from zope.interface import implementer
from pyramid.authorization import Authenticated, Everyone
from pyramid.interfaces import IAuthenticationPolicy, IDebugLogger
from pyramid.util import (
    SimpleSerializer,
    ascii_,
    bytes_,
    strings_differ,
    text_,
)
VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
class CallbackAuthenticationPolicy:
    """Abstract class"""
    debug = False
    callback = None
    def _log(self, msg, methodname, request):
        logger = request.registry.queryUtility(IDebugLogger)
        if logger:
            cls = self.__class__
            classname = cls.__module__ + '.' + cls.__name__
            methodname = classname + '.' + methodname
            logger.debug(methodname + ': ' + msg)
    def _clean_principal(self, princid):
        if princid in (Authenticated, Everyone):
            princid = None
        return princid
    def authenticated_userid(self, request):
        """Return the authenticated userid or ``None``.
        If no callback is registered, this will be the same as
        ``unauthenticated_userid``.
        If a ``callback`` is registered, this will return the userid if
        and only if the callback returns a value that is not ``None``.
        """
        debug = self.debug
        userid = self.unauthenticated_userid(request)
        if userid is None:
            debug and self._log(
                'call to unauthenticated_userid returned None; returning None',
                'authenticated_userid',
                request,
            )
            return None
        if self._clean_principal(userid) is None:
            debug and self._log(
                (
                    'use of userid %r is disallowed by any built-in Pyramid '
                    'security policy, returning None' % userid
                ),
                'authenticated_userid',
                request,
            )
            return None
        if self.callback is None:
            debug and self._log(
                'there was no groupfinder callback; returning %r' % (userid,),
                'authenticated_userid',
                request,
            )
            return userid
        callback_ok = self.callback(userid, request)
        if callback_ok is not None:  # is not None!
            debug and self._log(
                'groupfinder callback returned %r; returning %r'
                % (callback_ok, userid),
                'authenticated_userid',
                request,
            )
            return userid
        debug and self._log(
            'groupfinder callback returned None; returning None',
            'authenticated_userid',
            request,
        )
    def effective_principals(self, request):
        """A list of effective principals derived from request.
        This will return a list of principals including, at least,
        :data:`pyramid.authorization.Everyone`. If there is no authenticated
        userid, or the ``callback`` returns ``None``, this will be the
        only principal:
        .. code-block:: python
            return [Everyone]
        If the ``callback`` does not return ``None`` and an authenticated
        userid is found, then the principals will include
        :data:`pyramid.authorization.Authenticated`, the
        ``authenticated_userid`` and the list of principals returned by the
        ``callback``:
        .. code-block:: python
            extra_principals = callback(userid, request)
            return [Everyone, Authenticated, userid] + extra_principals
        """
        debug = self.debug
        effective_principals = [Everyone]
        userid = self.unauthenticated_userid(request)
        if userid is None:
            debug and self._log(
                'unauthenticated_userid returned %r; returning %r'
                % (userid, effective_principals),
                'effective_principals',
                request,
            )
            return effective_principals
        if self._clean_principal(userid) is None:
            debug and self._log(
                (
                    'unauthenticated_userid returned disallowed %r; returning '
                    '%r as if it was None' % (userid, effective_principals)
                ),
                'effective_principals',
                request,
            )
            return effective_principals
        if self.callback is None:
            debug and self._log(
                'groupfinder callback is None, so groups is []',
                'effective_principals',
                request,
            )
            groups = []
        else:
            groups = self.callback(userid, request)
            debug and self._log(
                'groupfinder callback returned %r as groups' % (groups,),
                'effective_principals',
                request,
            )
        if groups is None:  # is None!
            debug and self._log(
                'returning effective principals: %r' % (effective_principals,),
                'effective_principals',
                request,
            )
            return effective_principals
        effective_principals.append(Authenticated)
        effective_principals.append(userid)
        effective_principals.extend(groups)
        debug and self._log(
            'returning effective principals: %r' % (effective_principals,),
            'effective_principals',
            request,
        )
        return effective_principals
[docs]
@implementer(IAuthenticationPolicy)
class RepozeWho1AuthenticationPolicy(CallbackAuthenticationPolicy):
    """A :app:`Pyramid` :term:`authentication policy` which
    obtains data from the :mod:`repoze.who` 1.X WSGI 'API' (the
    ``repoze.who.identity`` key in the WSGI environment).
    Constructor Arguments
    ``identifier_name``
       Default: ``auth_tkt``.  The :mod:`repoze.who` plugin name that
       performs remember/forget.  Optional.
    ``callback``
        Default: ``None``.  A callback passed the :mod:`repoze.who` identity
        and the :term:`request`, expected to return ``None`` if the user
        represented by the identity doesn't exist or a sequence of principal
        identifiers (possibly empty) representing groups if the user does
        exist.  If ``callback`` is None, the userid will be assumed to exist
        with no group principals.
    Objects of this class implement the interface described by
    :class:`pyramid.interfaces.IAuthenticationPolicy`.
    """
    def __init__(self, identifier_name='auth_tkt', callback=None):
        self.identifier_name = identifier_name
        self.callback = callback
    def _get_identity(self, request):
        return request.environ.get('repoze.who.identity')
    def _get_identifier(self, request):
        plugins = request.environ.get('repoze.who.plugins')
        if plugins is None:
            return None
        identifier = plugins[self.identifier_name]
        return identifier
[docs]
    def authenticated_userid(self, request):
        """Return the authenticated userid or ``None``.
        If no callback is registered, this will be the same as
        ``unauthenticated_userid``.
        If a ``callback`` is registered, this will return the userid if
        and only if the callback returns a value that is not ``None``.
        """
        identity = self._get_identity(request)
        if identity is None:
            self.debug and self._log(
                'repoze.who identity is None, returning None',
                'authenticated_userid',
                request,
            )
            return None
        userid = identity['repoze.who.userid']
        if userid is None:
            self.debug and self._log(
                'repoze.who.userid is None, returning None' % userid,
                'authenticated_userid',
                request,
            )
            return None
        if self._clean_principal(userid) is None:
            self.debug and self._log(
                (
                    'use of userid %r is disallowed by any built-in Pyramid '
                    'security policy, returning None' % userid
                ),
                'authenticated_userid',
                request,
            )
            return None
        if self.callback is None:
            return userid
        if self.callback(identity, request) is not None:  # is not None!
            return userid 
[docs]
    def unauthenticated_userid(self, request):
        """Return the ``repoze.who.userid`` key from the detected identity."""
        identity = self._get_identity(request)
        if identity is None:
            return None
        return identity['repoze.who.userid'] 
[docs]
    def effective_principals(self, request):
        """A list of effective principals derived from the identity.
        This will return a list of principals including, at least,
        :data:`pyramid.authorization.Everyone`. If there is no identity, or
        the ``callback`` returns ``None``, this will be the only principal.
        If the ``callback`` does not return ``None`` and an identity is
        found, then the principals will include
        :data:`pyramid.authorization.Authenticated`, the
        ``authenticated_userid`` and the list of principals returned by the
        ``callback``.
        """
        effective_principals = [Everyone]
        identity = self._get_identity(request)
        if identity is None:
            self.debug and self._log(
                (
                    'repoze.who identity was None; returning %r'
                    % effective_principals
                ),
                'effective_principals',
                request,
            )
            return effective_principals
        if self.callback is None:
            groups = []
        else:
            groups = self.callback(identity, request)
        if groups is None:  # is None!
            self.debug and self._log(
                (
                    'security policy groups callback returned None; returning '
                    '%r' % effective_principals
                ),
                'effective_principals',
                request,
            )
            return effective_principals
        userid = identity['repoze.who.userid']
        if userid is None:
            self.debug and self._log(
                (
                    'repoze.who.userid was None; returning %r'
                    % effective_principals
                ),
                'effective_principals',
                request,
            )
            return effective_principals
        if self._clean_principal(userid) is None:
            self.debug and self._log(
                (
                    'unauthenticated_userid returned disallowed %r; returning '
                    '%r as if it was None' % (userid, effective_principals)
                ),
                'effective_principals',
                request,
            )
            return effective_principals
        effective_principals.append(Authenticated)
        effective_principals.append(userid)
        effective_principals.extend(groups)
        return effective_principals 
[docs]
    def remember(self, request, userid, **kw):
        """Store the ``userid`` as ``repoze.who.userid``.
        The identity to authenticated to :mod:`repoze.who`
        will contain the given userid as ``userid``, and
        provide all keyword arguments as additional identity
        keys. Useful keys could be ``max_age`` or ``userdata``.
        """
        identifier = self._get_identifier(request)
        if identifier is None:
            return []
        environ = request.environ
        identity = kw
        identity['repoze.who.userid'] = userid
        return identifier.remember(environ, identity) 
[docs]
    def forget(self, request):
        """Forget the current authenticated user.
        Return headers that, if included in a response, will delete the
        cookie responsible for tracking the current user.
        """
        identifier = self._get_identifier(request)
        if identifier is None:
            return []
        identity = self._get_identity(request)
        return identifier.forget(request.environ, identity) 
 
[docs]
@implementer(IAuthenticationPolicy)
class RemoteUserAuthenticationPolicy(CallbackAuthenticationPolicy):
    """A :app:`Pyramid` :term:`authentication policy` which
    obtains data from the ``REMOTE_USER`` WSGI environment variable.
    Constructor Arguments
    ``environ_key``
        Default: ``REMOTE_USER``.  The key in the WSGI environ which
        provides the userid.
    ``callback``
        Default: ``None``.  A callback passed the userid and the request,
        expected to return None if the userid doesn't exist or a sequence of
        principal identifiers (possibly empty) representing groups if the
        user does exist.  If ``callback`` is None, the userid will be assumed
        to exist with no group principals.
    ``debug``
        Default: ``False``.  If ``debug`` is ``True``, log messages to the
        Pyramid debug logger about the results of various authentication
        steps.  The output from debugging is useful for reporting to maillist
        or IRC channels when asking for support.
    Objects of this class implement the interface described by
    :class:`pyramid.interfaces.IAuthenticationPolicy`.
    """
    def __init__(self, environ_key='REMOTE_USER', callback=None, debug=False):
        self.environ_key = environ_key
        self.callback = callback
        self.debug = debug
[docs]
    def unauthenticated_userid(self, request):
        """The ``REMOTE_USER`` value found within the ``environ``."""
        return request.environ.get(self.environ_key) 
[docs]
    def remember(self, request, userid, **kw):
        """A no-op. The ``REMOTE_USER`` does not provide a protocol for
        remembering the user. This will be application-specific and can
        be done somewhere else or in a subclass."""
        return [] 
[docs]
    def forget(self, request):
        """A no-op. The ``REMOTE_USER`` does not provide a protocol for
        forgetting the user. This will be application-specific and can
        be done somewhere else or in a subclass."""
        return [] 
 
[docs]
@implementer(IAuthenticationPolicy)
class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
    """A :app:`Pyramid` :term:`authentication policy` which
    obtains data from a Pyramid "auth ticket" cookie.
    Constructor Arguments
    ``secret``
       The secret (a string) used for auth_tkt cookie signing.  This value
       should be unique across all values provided to Pyramid for various
       subsystem secrets (see :ref:`admonishment_against_secret_sharing`).
       Required.
    ``callback``
       Default: ``None``.  A callback passed the userid and the
       request, expected to return ``None`` if the userid doesn't
       exist or a sequence of principal identifiers (possibly empty) if
       the user does exist.  If ``callback`` is ``None``, the userid
       will be assumed to exist with no principals.  Optional.
    ``cookie_name``
       Default: ``auth_tkt``.  The cookie name used
       (string).  Optional.
    ``secure``
       Default: ``False``.  Only send the cookie back over a secure
       conn.  Optional.
    ``include_ip``
       Default: ``False``.  Make the requesting IP address part of
       the authentication data in the cookie.  Optional.
       For IPv6 this option is not recommended. The ``mod_auth_tkt``
       specification does not specify how to handle IPv6 addresses, so using
       this option in combination with IPv6 addresses may cause an
       incompatible cookie. It ties the authentication ticket to that
       individual's IPv6 address.
    ``timeout``
       Default: ``None``.  Maximum number of seconds which a newly
       issued ticket will be considered valid.  After this amount of
       time, the ticket will expire (effectively logging the user
       out).  If this value is ``None``, the ticket never expires.
       Optional.
    ``reissue_time``
       Default: ``None``.  If this parameter is set, it represents the number
       of seconds that must pass before an authentication token cookie is
       automatically reissued as the result of a request which requires
       authentication.  The duration is measured as the number of seconds
       since the last auth_tkt cookie was issued and 'now'.  If this value is
       ``0``, a new ticket cookie will be reissued on every request which
       requires authentication.
       A good rule of thumb: if you want auto-expired cookies based on
       inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
       ``reissue_time`` value to perhaps a tenth of the ``timeout`` value
       (120 or 2 mins).  It's nonsensical to set the ``timeout`` value lower
       than the ``reissue_time`` value, as the ticket will never be reissued
       if so.  However, such a configuration is not explicitly prevented.
       Optional.
    ``max_age``
       Default: ``None``.  The max age of the auth_tkt cookie, in
       seconds.  This differs from ``timeout`` inasmuch as ``timeout``
       represents the lifetime of the ticket contained in the cookie,
       while this value represents the lifetime of the cookie itself.
       When this value is set, the cookie's ``Max-Age`` and
       ``Expires`` settings will be set, allowing the auth_tkt cookie
       to last between browser sessions.  It is typically nonsensical
       to set this to a value that is lower than ``timeout`` or
       ``reissue_time``, although it is not explicitly prevented.
       Optional.
    ``path``
       Default: ``/``. The path for which the auth_tkt cookie is valid.
       May be desirable if the application only serves part of a domain.
       Optional.
    ``http_only``
       Default: ``False``. Hide cookie from JavaScript by setting the
       HttpOnly flag. Not honored by all browsers.
       Optional.
    ``wild_domain``
       Default: ``True``. An auth_tkt cookie will be generated for the
       wildcard domain. If your site is hosted as ``example.com`` this
       will make the cookie available for sites underneath ``example.com``
       such as ``www.example.com``.
       Optional.
    ``parent_domain``
       Default: ``False``. An auth_tkt cookie will be generated for the
       parent domain of the current site. For example if your site is
       hosted under ``www.example.com`` a cookie will be generated for
       ``.example.com``. This can be useful if you have multiple sites
       sharing the same domain. This option supercedes the ``wild_domain``
       option.
       Optional.
    ``domain``
       Default: ``None``. If provided the auth_tkt cookie will only be
       set for this domain. This option is not compatible with ``wild_domain``
       and ``parent_domain``.
       Optional.
    ``hashalg``
       Default: ``sha512`` (the literal string).
       Any hash algorithm supported by Python's ``hashlib.new()`` function
       can be used as the ``hashalg``.
       Cookies generated by different instances of AuthTktAuthenticationPolicy
       using different ``hashalg`` options are not compatible. Switching the
       ``hashalg`` will imply that all existing users with a valid cookie will
       be required to re-login.
       Optional.
    ``debug``
        Default: ``False``.  If ``debug`` is ``True``, log messages to the
        Pyramid debug logger about the results of various authentication
        steps.  The output from debugging is useful for reporting to maillist
        or IRC channels when asking for support.
    ``samesite``
        Default: ``'Lax'``.  The 'samesite' option of the session cookie. Set
        the value to the string ``'None'`` to turn off the samesite option.
    .. versionchanged:: 1.4
       Added the ``hashalg`` option, defaulting to ``sha512``.
    .. versionchanged:: 1.5
       Added the ``domain`` option.
       Added the ``parent_domain`` option.
    .. versionchanged:: 1.10
       Added the ``samesite`` option and made the default ``'Lax'``.
    Objects of this class implement the interface described by
    :class:`pyramid.interfaces.IAuthenticationPolicy`.
    """
    def __init__(
        self,
        secret,
        callback=None,
        cookie_name='auth_tkt',
        secure=False,
        include_ip=False,
        timeout=None,
        reissue_time=None,
        max_age=None,
        path="/",
        http_only=False,
        wild_domain=True,
        debug=False,
        hashalg='sha512',
        parent_domain=False,
        domain=None,
        samesite='Lax',
    ):
        self.cookie = AuthTktCookieHelper(
            secret,
            cookie_name=cookie_name,
            secure=secure,
            include_ip=include_ip,
            timeout=timeout,
            reissue_time=reissue_time,
            max_age=max_age,
            http_only=http_only,
            path=path,
            wild_domain=wild_domain,
            hashalg=hashalg,
            parent_domain=parent_domain,
            domain=domain,
            samesite=samesite,
        )
        self.callback = callback
        self.debug = debug
[docs]
    def unauthenticated_userid(self, request):
        """The userid key within the auth_tkt cookie."""
        result = self.cookie.identify(request)
        if result:
            return result['userid'] 
[docs]
    def remember(self, request, userid, **kw):
        """Accepts the following kw args: ``max_age=<int-seconds>,
        ``tokens=<sequence-of-ascii-strings>``.
        Return a list of headers which will set appropriate cookies on
        the response.
        """
        return self.cookie.remember(request, userid, **kw) 
[docs]
    def forget(self, request):
        """A list of headers which will delete appropriate cookies."""
        return self.cookie.forget(request) 
 
def b64encode(v):
    return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'')
def b64decode(v):
    return base64.b64decode(bytes_(v))
# this class licensed under the MIT license (stolen from Paste)
class AuthTicket:
    """
    This class represents an authentication token.  You must pass in
    the shared secret, the userid, and the IP address.  Optionally you
    can include tokens (a list of strings, representing role names),
    'user_data', which is arbitrary data available for your own use in
    later scripts.  Lastly, you can override the cookie name and
    timestamp.
    Once you provide all the arguments, use .cookie_value() to
    generate the appropriate authentication ticket.
    Usage::
        token = AuthTicket('sharedsecret', 'username',
            os.environ['REMOTE_ADDR'], tokens=['admin'])
        val = token.cookie_value()
    """
    def __init__(
        self,
        secret,
        userid,
        ip,
        tokens=(),
        user_data='',
        time=None,
        cookie_name='auth_tkt',
        secure=False,
        hashalg='md5',
    ):
        self.secret = secret
        self.userid = userid
        self.ip = ip
        self.tokens = ','.join(tokens)
        self.user_data = user_data
        if time is None:
            self.time = time_mod.time()
        else:
            self.time = time
        self.cookie_name = cookie_name
        self.secure = secure
        self.hashalg = hashalg
    def digest(self):
        return calculate_digest(
            self.ip,
            self.time,
            self.secret,
            self.userid,
            self.tokens,
            self.user_data,
            self.hashalg,
        )
    def cookie_value(self):
        v = '%s%08x%s!' % (self.digest(), int(self.time), quote(self.userid))
        if self.tokens:
            v += self.tokens + '!'
        v += self.user_data
        return v
# this class licensed under the MIT license (stolen from Paste)
class BadTicket(Exception):
    """
    Exception raised when a ticket can't be parsed.  If we get far enough to
    determine what the expected digest should have been, expected is set.
    This should not be shown by default, but can be useful for debugging.
    """
    def __init__(self, msg, expected=None):
        self.expected = expected
        Exception.__init__(self, msg)
# this function licensed under the MIT license (stolen from Paste)
def parse_ticket(secret, ticket, ip, hashalg='md5'):
    """
    Parse the ticket, returning (timestamp, userid, tokens, user_data).
    If the ticket cannot be parsed, a ``BadTicket`` exception will be raised
    with an explanation.
    """
    ticket = text_(ticket).strip('"')
    digest_size = hashlib.new(hashalg).digest_size * 2
    digest = ticket[:digest_size]
    try:
        timestamp = int(ticket[digest_size : digest_size + 8], 16)
    except ValueError as e:
        raise BadTicket('Timestamp is not a hex integer: %s' % e)
    try:
        userid, data = ticket[digest_size + 8 :].split('!', 1)
    except ValueError:
        raise BadTicket('userid is not followed by !')
    userid = unquote(userid)
    if '!' in data:
        tokens, user_data = data.split('!', 1)
    else:  # pragma: no cover (never generated)
        # @@: Is this the right order?
        tokens = ''
        user_data = data
    expected = calculate_digest(
        ip, timestamp, secret, userid, tokens, user_data, hashalg
    )
    # Avoid timing attacks (see
    # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf)
    if strings_differ(expected, digest):
        raise BadTicket(
            'Digest signature is not correct', expected=(expected, digest)
        )
    tokens = tokens.split(',')
    return (timestamp, userid, tokens, user_data)
# this function licensed under the MIT license (stolen from Paste)
def calculate_digest(
    ip, timestamp, secret, userid, tokens, user_data, hashalg='md5'
):
    secret = bytes_(secret, 'utf-8')
    userid = bytes_(userid, 'utf-8')
    tokens = bytes_(tokens, 'utf-8')
    user_data = bytes_(user_data, 'utf-8')
    hash_obj = hashlib.new(hashalg)
    # Check to see if this is an IPv6 address
    if ':' in ip:
        ip_timestamp = ip + str(int(timestamp))
        ip_timestamp = bytes_(ip_timestamp)
    else:
        # encode_ip_timestamp not required, left in for backwards compatibility
        ip_timestamp = encode_ip_timestamp(ip, timestamp)
    hash_obj.update(
        ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data
    )
    digest = hash_obj.hexdigest()
    hash_obj2 = hashlib.new(hashalg)
    hash_obj2.update(bytes_(digest) + secret)
    return hash_obj2.hexdigest()
# this function licensed under the MIT license (stolen from Paste)
def encode_ip_timestamp(ip, timestamp):
    ip_chars = ''.join(map(chr, map(int, ip.split('.'))))
    t = int(timestamp)
    ts = (
        (t & 0xFF000000) >> 24,
        (t & 0xFF0000) >> 16,
        (t & 0xFF00) >> 8,
        t & 0xFF,
    )
    ts_chars = ''.join(map(chr, ts))
    return bytes_(ip_chars + ts_chars)
[docs]
class AuthTktCookieHelper:
    """
    A helper class for security policies that obtains data from an "auth
    ticket" cookie.
    Constructor Arguments
    ``secret``
       The secret (a string) used for auth_tkt cookie signing.  This value
       should be unique across all values provided to Pyramid for various
       subsystem secrets (see :ref:`admonishment_against_secret_sharing`).
       Required.
    ``cookie_name``
       Default: ``auth_tkt``.  The cookie name used
       (string).  Optional.
    ``secure``
       Default: ``False``.  Only send the cookie back over a secure
       conn.  Optional.
    ``include_ip``
       Default: ``False``.  Make the requesting IP address part of
       the authentication data in the cookie.  Optional.
       For IPv6 this option is not recommended. The ``mod_auth_tkt``
       specification does not specify how to handle IPv6 addresses, so using
       this option in combination with IPv6 addresses may cause an
       incompatible cookie. It ties the authentication ticket to that
       individual's IPv6 address.
    ``timeout``
       Default: ``None``.  Maximum number of seconds which a newly
       issued ticket will be considered valid.  After this amount of
       time, the ticket will expire (effectively logging the user
       out).  If this value is ``None``, the ticket never expires.
       Optional.
    ``reissue_time``
       Default: ``None``.  If this parameter is set, it represents the number
       of seconds that must pass before an authentication token cookie is
       automatically reissued as the result of a request which requires
       authentication.  The duration is measured as the number of seconds
       since the last auth_tkt cookie was issued and 'now'.  If this value is
       ``0``, a new ticket cookie will be reissued on every request which
       requires authentication.
       A good rule of thumb: if you want auto-expired cookies based on
       inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
       ``reissue_time`` value to perhaps a tenth of the ``timeout`` value
       (120 or 2 mins).  It's nonsensical to set the ``timeout`` value lower
       than the ``reissue_time`` value, as the ticket will never be reissued
       if so.  However, such a configuration is not explicitly prevented.
       Optional.
    ``max_age``
       Default: ``None``.  The max age of the auth_tkt cookie, in
       seconds.  This differs from ``timeout`` inasmuch as ``timeout``
       represents the lifetime of the ticket contained in the cookie,
       while this value represents the lifetime of the cookie itself.
       When this value is set, the cookie's ``Max-Age`` and
       ``Expires`` settings will be set, allowing the auth_tkt cookie
       to last between browser sessions.  It is typically nonsensical
       to set this to a value that is lower than ``timeout`` or
       ``reissue_time``, although it is not explicitly prevented.
       Optional.
    ``path``
       Default: ``/``. The path for which the auth_tkt cookie is valid.
       May be desirable if the application only serves part of a domain.
       Optional.
    ``http_only``
       Default: ``False``. Hide cookie from JavaScript by setting the
       HttpOnly flag. Not honored by all browsers.
       Optional.
    ``wild_domain``
       Default: ``True``. An auth_tkt cookie will be generated for the
       wildcard domain. If your site is hosted as ``example.com`` this
       will make the cookie available for sites underneath ``example.com``
       such as ``www.example.com``.
       Optional.
    ``parent_domain``
       Default: ``False``. An auth_tkt cookie will be generated for the
       parent domain of the current site. For example if your site is
       hosted under ``www.example.com`` a cookie will be generated for
       ``.example.com``. This can be useful if you have multiple sites
       sharing the same domain. This option supercedes the ``wild_domain``
       option.
       Optional.
    ``domain``
       Default: ``None``. If provided the auth_tkt cookie will only be
       set for this domain. This option is not compatible with ``wild_domain``
       and ``parent_domain``.
       Optional.
    ``hashalg``
       Default: ``sha512`` (the literal string).
       Any hash algorithm supported by Python's ``hashlib.new()`` function
       can be used as the ``hashalg``.
       Cookies generated by different instances of AuthTktAuthenticationPolicy
       using different ``hashalg`` options are not compatible. Switching the
       ``hashalg`` will imply that all existing users with a valid cookie will
       be required to re-login.
       Optional.
    ``debug``
        Default: ``False``.  If ``debug`` is ``True``, log messages to the
        Pyramid debug logger about the results of various authentication
        steps.  The output from debugging is useful for reporting to maillist
        or IRC channels when asking for support. Optional.
    ``samesite``
        Default: ``'Lax'``.  The 'samesite' option of the session cookie. Set
        the value to ``None`` to turn off the samesite option. Optional.
    .. versionchanged:: 2.0
        The default ``hashalg`` was changed from ``md5`` to ``sha512``.
    """
    parse_ticket = staticmethod(parse_ticket)  # for tests
    AuthTicket = AuthTicket  # for tests
    BadTicket = BadTicket  # for tests
    now = None  # for tests
    userid_type_decoders = {
        'int': int,
        'unicode': lambda x: utf_8_decode(x)[0],  # bw compat for old cookies
        'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
        'b64str': lambda x: b64decode(x),
    }
    userid_type_encoders = {
        int: ('int', str),
        str: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
        bytes: ('b64str', lambda x: b64encode(x)),
    }
    def __init__(
        self,
        secret,
        cookie_name='auth_tkt',
        secure=False,
        include_ip=False,
        timeout=None,
        reissue_time=None,
        max_age=None,
        http_only=False,
        path="/",
        wild_domain=True,
        hashalg='sha512',
        parent_domain=False,
        domain=None,
        samesite='Lax',
    ):
        self.cookie_profile = CookieProfile(
            cookie_name=cookie_name,
            secure=secure,
            max_age=max_age,
            httponly=http_only,
            path=path,
            serializer=SimpleSerializer(),
            samesite=samesite,
        )
        self.secret = secret
        self.cookie_name = cookie_name
        self.secure = secure
        self.include_ip = include_ip
        self.timeout = timeout if timeout is None else int(timeout)
        self.reissue_time = (
            reissue_time if reissue_time is None else int(reissue_time)
        )
        self.max_age = max_age if max_age is None else int(max_age)
        self.wild_domain = wild_domain
        self.parent_domain = parent_domain
        self.domain = domain
        self.hashalg = hashalg
    def _get_cookies(self, request, value, max_age=None):
        if self.domain:
            domain = self.domain
        else:
            cur_domain = request.domain
            if self.parent_domain and cur_domain.count('.') > 1:
                domain = cur_domain.split('.', 1)[1]
            elif self.wild_domain:
                domain = cur_domain
            else:
                domain = None
        profile = self.cookie_profile(request)
        kw = {'domains': [domain]}
        if max_age is not None:
            kw['max_age'] = max_age
        headers = profile.get_headers(value, **kw)
        return headers
[docs]
    def identify(self, request):
        """Return a dictionary with authentication information, or ``None``
        if no valid auth_tkt is attached to ``request``"""
        environ = request.environ
        cookie = request.cookies.get(self.cookie_name)
        if cookie is None:
            return None
        if self.include_ip:
            remote_addr = environ['REMOTE_ADDR']
        else:
            remote_addr = '0.0.0.0'
        try:
            timestamp, userid, tokens, user_data = self.parse_ticket(
                self.secret, cookie, remote_addr, self.hashalg
            )
        except self.BadTicket:
            return None
        now = self.now  # service tests
        if now is None:
            now = time_mod.time()
        if self.timeout and ((timestamp + self.timeout) < now):
            # the auth_tkt data has expired
            return None
        userid_typename = 'userid_type:'
        user_data_info = user_data.split('|')
        for datum in filter(None, user_data_info):
            if datum.startswith(userid_typename):
                userid_type = datum[len(userid_typename) :]
                decoder = self.userid_type_decoders.get(userid_type)
                if decoder:
                    userid = decoder(userid)
        reissue = self.reissue_time is not None
        if reissue and not hasattr(request, '_authtkt_reissued'):
            if (now - timestamp) > self.reissue_time:
                # See https://github.com/Pylons/pyramid/issues#issue/108
                tokens = list(filter(None, tokens))
                headers = self.remember(
                    request, userid, max_age=self.max_age, tokens=tokens
                )
                def reissue_authtkt(request, response):
                    if not hasattr(request, '_authtkt_reissue_revoked'):
                        for k, v in headers:
                            response.headerlist.append((k, v))
                request.add_response_callback(reissue_authtkt)
                request._authtkt_reissued = True
        environ['REMOTE_USER_TOKENS'] = tokens
        environ['REMOTE_USER_DATA'] = user_data
        environ['AUTH_TYPE'] = 'cookie'
        identity = {}
        identity['timestamp'] = timestamp
        identity['userid'] = userid
        identity['tokens'] = tokens
        identity['userdata'] = user_data
        return identity 
[docs]
    def forget(self, request):
        """Return a set of expires Set-Cookie headers, which will destroy
        any existing auth_tkt cookie when attached to a response"""
        request._authtkt_reissue_revoked = True
        return self._get_cookies(request, None) 
[docs]
    def remember(self, request, userid, max_age=None, tokens=()):
        """Return a set of Set-Cookie headers; when set into a response,
        these headers will represent a valid authentication ticket.
        ``max_age``
          The max age of the auth_tkt cookie, in seconds.  When this value is
          set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
          allowing the auth_tkt cookie to last between browser sessions.  If
          this value is ``None``, the ``max_age`` value provided to the
          helper itself will be used as the ``max_age`` value.  Default:
          ``None``.
        ``tokens``
          A sequence of strings that will be placed into the auth_tkt tokens
          field.  Each string in the sequence must be of the Python ``str``
          type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
          Tokens are available in the returned identity when an auth_tkt is
          found in the request and unpacked.  Default: ``()``.
        """
        max_age = self.max_age if max_age is None else int(max_age)
        environ = request.environ
        if self.include_ip:
            remote_addr = environ['REMOTE_ADDR']
        else:
            remote_addr = '0.0.0.0'
        user_data = ''
        encoding_data = self.userid_type_encoders.get(type(userid))
        if encoding_data:
            encoding, encoder = encoding_data
        else:
            warnings.warn(
                "userid is of type {}, and is not supported by the "
                "AuthTktAuthenticationPolicy. Explicitly converting to string "
                "and storing as base64. Subsequent requests will receive a "
                "string as the userid, it will not be decoded back to the "
                "type provided.".format(type(userid)),
                RuntimeWarning,
            )
            encoding, encoder = self.userid_type_encoders.get(str)
            userid = str(userid)
        userid = encoder(userid)
        user_data = 'userid_type:%s' % encoding
        new_tokens = []
        for token in tokens:
            if isinstance(token, str):
                try:
                    token = ascii_(token)
                except UnicodeEncodeError:
                    raise ValueError("Invalid token %r" % (token,))
            if not (isinstance(token, str) and VALID_TOKEN.match(token)):
                raise ValueError("Invalid token %r" % (token,))
            new_tokens.append(token)
        tokens = tuple(new_tokens)
        if hasattr(request, '_authtkt_reissued'):
            request._authtkt_reissue_revoked = True
        ticket = self.AuthTicket(
            self.secret,
            userid,
            remote_addr,
            tokens=tokens,
            user_data=user_data,
            cookie_name=self.cookie_name,
            secure=self.secure,
            hashalg=self.hashalg,
        )
        cookie_value = ticket.cookie_value()
        return self._get_cookies(request, cookie_value, max_age) 
 
[docs]
@implementer(IAuthenticationPolicy)
class SessionAuthenticationPolicy(CallbackAuthenticationPolicy):
    """A :app:`Pyramid` authentication policy which gets its data from the
    configured :term:`session`.  For this authentication policy to work, you
    will have to follow the instructions in the :ref:`sessions_chapter` to
    configure a :term:`session factory`.
    Constructor Arguments
    ``prefix``
       A prefix used when storing the authentication parameters in the
       session. Defaults to 'auth.'. Optional.
    ``callback``
       Default: ``None``.  A callback passed the userid and the
       request, expected to return ``None`` if the userid doesn't
       exist or a sequence of principal identifiers (possibly empty) if
       the user does exist.  If ``callback`` is ``None``, the userid
       will be assumed to exist with no principals.  Optional.
    ``debug``
        Default: ``False``.  If ``debug`` is ``True``, log messages to the
        Pyramid debug logger about the results of various authentication
        steps.  The output from debugging is useful for reporting to maillist
        or IRC channels when asking for support.
    """
    def __init__(self, prefix='auth.', callback=None, debug=False):
        self.callback = callback
        self.debug = debug
        self.helper = SessionAuthenticationHelper(prefix)
[docs]
    def remember(self, request, userid, **kw):
        """Store a userid in the session."""
        return self.helper.remember(request, userid, **kw) 
[docs]
    def forget(self, request):
        """Remove the stored userid from the session."""
        return self.helper.forget(request) 
    def unauthenticated_userid(self, request):
        return self.helper.authenticated_userid(request) 
[docs]
class SessionAuthenticationHelper:
    """A helper for use with a :term:`security policy` which stores user data
    in the configured :term:`session`.
    Constructor Arguments
    ``prefix``
       A prefix used when storing the authentication parameters in the
       session. Defaults to 'auth.'. Optional.
    """
    def __init__(self, prefix='auth.'):
        self.userid_key = prefix + 'userid'
[docs]
    def remember(self, request, userid, **kw):
        """Store a userid in the session."""
        request.session[self.userid_key] = userid
        return [] 
[docs]
    def forget(self, request, **kw):
        """Remove the stored userid from the session."""
        if self.userid_key in request.session:
            del request.session[self.userid_key]
        return [] 
[docs]
    def authenticated_userid(self, request):
        """Return the stored userid."""
        return request.session.get(self.userid_key) 
 
[docs]
@implementer(IAuthenticationPolicy)
class BasicAuthAuthenticationPolicy(CallbackAuthenticationPolicy):
    """A :app:`Pyramid` authentication policy which uses HTTP standard basic
    authentication protocol to authenticate users.  To use this policy you will
    need to provide a callback which checks the supplied user credentials
    against your source of login data.
    Constructor Arguments
    ``check``
       A callback function passed a username, password and request, in that
       order as positional arguments.  Expected to return ``None`` if the
       userid doesn't exist or a sequence of principal identifiers (possibly
       empty) if the user does exist.
    ``realm``
       Default: ``"Realm"``.  The Basic Auth Realm string.  Usually displayed
       to the user by the browser in the login dialog.
    ``debug``
        Default: ``False``.  If ``debug`` is ``True``, log messages to the
        Pyramid debug logger about the results of various authentication
        steps.  The output from debugging is useful for reporting to maillist
        or IRC channels when asking for support.
    **Issuing a challenge**
    Regular browsers will not send username/password credentials unless they
    first receive a challenge from the server.  The following recipe will
    register a view that will send a Basic Auth challenge to the user whenever
    there is an attempt to call a view which results in a Forbidden response::
        from pyramid.httpexceptions import HTTPUnauthorized
        from pyramid.security import forget
        from pyramid.view import forbidden_view_config
        @forbidden_view_config()
        def forbidden_view(request):
            if request.authenticated_userid is None:
                response = HTTPUnauthorized()
                response.headers.update(forget(request))
                return response
            return HTTPForbidden()
    """
    def __init__(self, check, realm='Realm', debug=False):
        self.check = check
        self.realm = realm
        self.debug = debug
[docs]
    def unauthenticated_userid(self, request):
        """The userid parsed from the ``Authorization`` request header."""
        credentials = extract_http_basic_credentials(request)
        if credentials:
            return credentials.username 
[docs]
    def remember(self, request, userid, **kw):
        """A no-op. Basic authentication does not provide a protocol for
        remembering the user. Credentials are sent on every request.
        """
        return [] 
[docs]
    def forget(self, request):
        """Returns challenge headers. This should be attached to a response
        to indicate that credentials are required."""
        return [('WWW-Authenticate', 'Basic realm="%s"' % self.realm)] 
    def callback(self, username, request):
        # Username arg is ignored. Unfortunately
        # extract_http_basic_credentials winds up getting called twice when
        # authenticated_userid is called. Avoiding that, however,
        # winds up duplicating logic from the superclass.
        credentials = extract_http_basic_credentials(request)
        if credentials:
            username, password = credentials
            return self.check(username, password, request) 
HTTPBasicCredentials = namedtuple(
    'HTTPBasicCredentials', ['username', 'password']
)