import inspect
from pyramid.config import PHASE1_CONFIG
from pyramid.exceptions import ConfigurationError
from zope.interface import (
Attribute,
Interface,
alsoProvides,
classImplements,
implementer,
)
[docs]class IRetryableError(Interface):
"""
A marker interface for retryable errors.
An interface can be applied to any ``Exception`` class or object to
indicate that it should be treated as a :term:`retryable error`.
"""
[docs]class IBeforeRetry(Interface):
"""
An event emitted immediately prior to throwing away the request
and creating a new one.
This event may be useful when state is stored on the ``request.environ``
that needs to be updated before a new request is created.
"""
environ = Attribute('The environ object that is reused between requests.')
request = Attribute('The request object that is being discarded.')
exception = Attribute('The exception that request processing raised.')
response = Attribute('The response object that is being discarded. '
'This may be ``None`` if no response was generated, '
'which happens when request processing raises an '
"exception that isn't caught by any exception view.")
@implementer(IBeforeRetry)
class BeforeRetry(object):
"""
An event emitted immediately prior to throwing away the request
and creating a new one.
This event may be useful when state is stored on the ``request.environ``
that needs to be updated before a new request is created.
:ivar request: The :class:`pyramid.request.Request` object that is being
discarded.
"""
def __init__(self, request, exception, response=None):
self.request = request
self.environ = request.environ
self.exception = exception
self.response = response
[docs]@implementer(IRetryableError)
class RetryableException(Exception):
""" A retryable exception should be raised when an error occurs."""
[docs]def RetryableExecutionPolicy(attempts=3, activate_hook=None):
"""
Create a :term:`execution policy` that catches any
:term:`retryable error` and sends it through the pipeline again up to
a maximum of ``attempts`` attempts.
If ``activate_hook`` is set it will be consulted prior to each request
to determine if retries should be enabled. It should return a number > 0
of attempts to be used or ``None`` which will indicate to use the default
number of attempts.
"""
assert attempts > 0
def retry_policy(environ, router):
# make the original request
request_ctx = router.request_context(environ)
request = request_ctx.begin()
try:
if activate_hook:
retry_attempts = activate_hook(request)
if retry_attempts is None:
retry_attempts = attempts
else:
assert retry_attempts > 0
else:
retry_attempts = attempts
# if we are supporting multiple attempts then we must make
# make the body seekable in order to re-use it across multiple
# attempts. make_body_seekable will copy wsgi.input if
# necessary, otherwise it will rewind the copy to position zero
if retry_attempts != 1:
request.make_body_seekable()
# Catch make_body_seekable (e.g. 408 RequestTimeout)
# and activate_hook exceptions and clean up.
except BaseException:
request_ctx.end()
raise
for number in range(retry_attempts):
# track the attempt info in the environ
# try to set it as soon as possible so that it's available
# in the request factory and elsewhere if people want it
# note: set all of these values here as they are cleared after
# each attempt
environ['retry.attempt'] = number
environ['retry.attempts'] = retry_attempts
# if we are not on the first attempt then we should start
# with a new request object and throw away any changes to
# the old object, however we do this carefully to try and
# avoid extra copies of the body
if number > 0:
# try to make sure this code stays in sync with pyramid's
# router which normally creates requests
request_ctx = router.request_context(environ)
request = request_ctx.begin()
try:
response = router.invoke_request(request)
# check for a squashed exception and handle it
# this would happen if an exception view was invoked and
# rendered an error response
exc = getattr(request, 'exception', None)
if exc is not None:
# if this is a retryable exception then continue to the
# next attempt, discarding the current response
if is_error_retryable(request, exc):
request.registry.notify(
BeforeRetry(request, exc, response=response))
continue
return response
except Exception as exc:
# if this was the last attempt or the exception is not
# retryable then there's nothing left for us to do
if not is_error_retryable(request, exc):
raise
else:
request.registry.notify(BeforeRetry(request, exc))
# cleanup any changes we made to the request
finally:
request_ctx.end()
del environ['retry.attempt']
del environ['retry.attempts']
return retry_policy
[docs]def mark_error_retryable(error):
"""
Mark an exception instance or type as retryable. If this exception
is caught by ``pyramid_retry`` then it may retry the request.
"""
if isinstance(error, Exception):
alsoProvides(error, IRetryableError)
elif inspect.isclass(error) and issubclass(error, Exception):
classImplements(error, IRetryableError)
else:
raise ValueError(
'only exception objects or types may be marked retryable')
[docs]def is_error_retryable(request, exc):
"""
Return ``True`` if the exception is recognized as :term:`retryable error`.
This will return ``False`` if the request is on its last attempt.
This will return ``False`` if ``pyramid_retry`` is inactive for the
request.
"""
if is_last_attempt(request):
return False
return (
isinstance(exc, RetryableException)
or IRetryableError.providedBy(exc)
)
[docs]def is_last_attempt(request):
"""
Return ``True`` if the request is on its last attempt, meaning that
``pyramid_retry`` will not be issuing any new attempts, regardless of
what happens when executing this request.
This will return ``True`` if ``pyramid_retry`` is inactive for the
request.
"""
environ = request.environ
attempt = environ.get('retry.attempt')
attempts = environ.get('retry.attempts')
if attempt is None or attempts is None:
return True
return attempt + 1 == attempts
[docs]class RetryableErrorPredicate(object):
"""
A :term:`view predicate` registered as ``retryable_error``. Can be
used to determine if an exception view should execute based on whether
the exception is a :term:`retryable error`.
.. seealso:: See :func:`pyramid_retry.is_error_retryable`.
"""
def __init__(self, val, config):
if not isinstance(val, bool):
raise ConfigurationError(
'The "retryable_error" view predicate value must be '
'True or False.',
)
self.val = val
def text(self):
return 'retryable_error = %s' % (self.val,)
phash = text
def __call__(self, context, request):
exc = getattr(request, 'exception', None)
is_retryable = is_error_retryable(request, exc)
return (
(self.val and is_retryable)
or (not self.val and not is_retryable)
)
[docs]class LastAttemptPredicate(object):
"""
A :term:`view predicate` registered as ``last_retry_attempt``. Can be used
to determine if an exception view should execute based on whether it's
the last retry attempt before aborting the request.
.. seealso:: See :func:`pyramid_retry.is_last_attempt`.
"""
def __init__(self, val, config):
if not isinstance(val, bool):
raise ConfigurationError(
'The "last_retry_attempt" view predicate value must be '
'True or False.',
)
self.val = val
def text(self):
return 'last_retry_attempt = %s' % (self.val,)
phash = text
def __call__(self, context, request):
is_last = is_last_attempt(request)
return ((self.val and is_last) or (not self.val and not is_last))
[docs]def includeme(config):
"""
Activate the ``pyramid_retry`` execution policy in your application.
This will add the :func:`pyramid_retry.RetryableErrorPolicy` with
``attempts`` pulled from the ``retry.attempts`` setting.
The ``last_retry_attempt`` and ``retryable_error`` view predicates
are registered.
This should be included in your Pyramid application via
``config.include('pyramid_retry')``.
"""
settings = config.get_settings()
config.add_view_predicate('last_retry_attempt', LastAttemptPredicate)
config.add_view_predicate('retryable_error', RetryableErrorPredicate)
def register():
attempts = int(settings.get('retry.attempts') or 3)
settings['retry.attempts'] = attempts
activate_hook = settings.get('retry.activate_hook')
activate_hook = config.maybe_dotted(activate_hook)
policy = RetryableExecutionPolicy(
attempts,
activate_hook=activate_hook,
)
config.set_execution_policy(policy)
# defer registration to allow time to modify settings
config.action(None, register, order=PHASE1_CONFIG)