"""Base classes for all cache backends
.. automodsumm:: requests_cache.backends.base
:classes-only:
:nosignatures:
"""
from __future__ import annotations
from abc import ABC
from collections import UserDict
from datetime import datetime
from logging import getLogger
from pickle import PickleError
from typing import TYPE_CHECKING, Iterable, Iterator, List, MutableMapping, Optional, TypeVar
from warnings import warn
from requests import Request, Response
from ..cache_keys import create_key, redact_response
from ..models import AnyRequest, CachedResponse
from ..policy import DEFAULT_CACHE_NAME, CacheSettings, ExpirationTime
from ..serializers import SerializerType, init_serializer
# Specific exceptions that may be raised during deserialization
DESERIALIZE_ERRORS = (AttributeError, ImportError, PickleError, TypeError, ValueError)
logger = getLogger(__name__)
[docs]class BaseCache:
"""Base class for cache backends. Can be used as a non-persistent, in-memory cache.
This manages higher-level cache operations, including:
* Saving and retrieving responses
* Managing redirect history
* Convenience methods for general cache info
* Dict-like wrapper methods around the underlying storage
Notes:
* Lower-level storage operations are handled by :py:class:`.BaseStorage`.
* To extend this with your own custom backend, see :ref:`custom-backends`.
Args:
cache_name: Cache prefix or namespace, depending on backend
serializer: Serializer name or instance
kwargs: Additional backend-specific keyword arguments
"""
def __init__(self, cache_name: str = DEFAULT_CACHE_NAME, **kwargs):
self.cache_name = cache_name
self.responses: BaseStorage[str, CachedResponse] = DictStorage()
self.redirects: BaseStorage[str, str] = DictStorage()
self._settings = CacheSettings() # Init and public access is done in CachedSession
# Main cache operations
# ---------------------
[docs] def get_response(self, key: str, default=None) -> Optional[CachedResponse]:
"""Retrieve a response from the cache, if it exists
Args:
key: Cache key for the response
default: Value to return if `key` is not in the cache
"""
try:
response = self.responses.get(key)
if response is None: # Note: bool(requests.Response) is False if status > 400
response = self.responses[self.redirects[key]]
return response
except (AttributeError, KeyError):
return default
[docs] def save_response(
self,
response: Response,
cache_key: Optional[str] = None,
expires: Optional[datetime] = None,
):
"""Save a response to the cache
Args:
cache_key: Cache key for this response; will otherwise be generated based on request
response: Response to save
expires: Absolute expiration time for this response
"""
cache_key = cache_key or self.create_key(response.request)
cached_response = CachedResponse.from_response(response, expires=expires)
cached_response = redact_response(cached_response, self._settings.ignored_parameters)
self.responses[cache_key] = cached_response
for r in response.history:
self.redirects[self.create_key(r.request)] = cache_key
[docs] def clear(self):
"""Delete all items from the cache"""
logger.info('Clearing all items from the cache')
self.responses.clear()
self.redirects.clear()
[docs] def close(self):
"""Close any open backend connections"""
logger.debug('Closing backend connections')
self.responses.close()
self.redirects.close()
[docs] def create_key(
self,
request: AnyRequest,
match_headers: Optional[Iterable[str]] = None,
**kwargs,
) -> str:
"""Create a normalized cache key from a request object"""
key_fn = self._settings.key_fn if self._settings.key_fn is not None else create_key
return key_fn(
request=request,
ignored_parameters=self._settings.ignored_parameters,
match_headers=match_headers or self._settings.match_headers,
serializer=self.responses.serializer,
**kwargs,
)
# Convenience methods
# --------------------
[docs] def contains(
self,
key: Optional[str] = None,
request: Optional[AnyRequest] = None,
url: Optional[str] = None,
):
"""Check if the specified request is cached
Args:
key: Check for a specific cache key
request: Check for a matching request, according to current request matching settings
url: Check for a matching GET request with the specified URL
"""
if url:
request = Request('GET', url)
if request and not key:
key = self.create_key(request)
return key in self.responses or key in self.redirects
[docs] def delete(
self,
*keys: str,
expired: bool = False,
invalid: bool = False,
older_than: ExpirationTime = None,
requests: Optional[Iterable[AnyRequest]] = None,
urls: Optional[Iterable[str]] = None,
):
"""Remove responses from the cache according one or more conditions.
Args:
keys: Remove responses with these cache keys
expired: Remove all expired responses
invalid: Remove all invalid responses (that can't be deserialized with current settings)
older_than: Remove responses older than this value, relative to ``response.created_at``
requests: Remove matching responses, according to current request matching settings
urls: Remove matching GET requests for the specified URL(s)
"""
delete_keys: List[str] = list(keys) if keys else []
if urls:
requests = list(requests or []) + [Request('GET', url).prepare() for url in urls]
if requests:
delete_keys += [self.create_key(request) for request in requests]
for response in self.filter(
valid=False, expired=expired, invalid=invalid, older_than=older_than
):
delete_keys.append(response.cache_key)
logger.debug(f'Deleting up to {len(delete_keys)} responses')
# For some backends, we don't want to use bulk_delete if there's only one key
if len(delete_keys) == 1:
try:
del self.responses[delete_keys[0]]
except KeyError:
pass
else:
self.responses.bulk_delete(delete_keys)
self._prune_redirects()
def _prune_redirects(self):
"""Remove any redirects that no longer point to an existing response"""
invalid_redirects = [k for k, v in self.redirects.items() if v not in self.responses]
self.redirects.bulk_delete(invalid_redirects)
[docs] def filter(
self,
valid: bool = True,
expired: bool = True,
invalid: bool = False,
older_than: ExpirationTime = None,
) -> Iterator[CachedResponse]:
"""Get responses from the cache, with optional filters for which responses to include:
Args:
valid: Include valid and unexpired responses; set to ``False`` to get **only**
expired/invalid/old responses
expired: Include expired responses
invalid: Include invalid responses (as an empty ``CachedResponse``)
older_than: Get responses older than this value, relative to ``response.created_at``
"""
if not any([valid, expired, invalid, older_than]):
return
for key in self.responses.keys():
response = self.get_response(key)
# Use an empty response as a placeholder for an invalid response, if specified
if invalid and response is None:
response = CachedResponse(status_code=504)
response.cache_key = key
yield response
elif response is not None and (
(valid and not response.is_expired)
or (expired and response.is_expired)
or (older_than and response.is_older_than(older_than))
):
yield response
[docs] def recreate_keys(self):
"""Recreate cache keys for all previously cached responses"""
logger.debug('Recreating all cache keys')
old_keys = list(self.responses.keys())
for old_cache_key in old_keys:
response = self.responses[old_cache_key]
new_cache_key = self.create_key(response.request)
if new_cache_key != old_cache_key:
self.responses[new_cache_key] = response
del self.responses[old_cache_key]
[docs] def reset_expiration(self, expire_after: ExpirationTime = None):
"""Set a new expiration value to set on existing cache items
Args:
expire_after: New expiration value, **relative to the current time**
"""
logger.info(f'Resetting expiration with: {expire_after}')
for response in self.filter():
response.reset_expiration(expire_after)
self.responses[response.cache_key] = response
[docs] def update(self, other: 'BaseCache'): # type: ignore
"""Update this cache with the contents of another cache"""
logger.debug(f'Copying {len(other.responses)} responses from {repr(other)} to {repr(self)}')
self.responses.update(other.responses)
self.redirects.update(other.redirects)
[docs] def urls(self, **kwargs) -> List[str]:
"""Get all unique cached URLs. Optionally takes keyword arguments for :py:meth:`.filter`."""
return sorted({response.url for response in self.filter(**kwargs)})
def __str__(self):
return f'<{self.__class__.__name__}(name={self.cache_name})>'
def __repr__(self):
return str(self)
# Deprecated methods
#
# Note: delete_urls(), has_key(), keys(), values(), and response_count() were added relatively
# recently and appear to not be widely used, so these will likely be removed within 1 or 2
# minor releases.
#
# The methods delete_url(), has_url() and remove_expired_responses() have been around for longer
# and have appeared in various examples in the docs, so these will likely stick around longer
# (or could be kept indefinitely if someone really needs them)
# --------------------
[docs] def delete_url(self, url: str, method: str = 'GET', **kwargs):
warn(
'BaseCache.delete_url() is deprecated; please use .delete(urls=...) instead',
DeprecationWarning,
)
self.delete(requests=[Request(method, url, **kwargs)])
[docs] def delete_urls(self, urls: Iterable[str], method: str = 'GET', **kwargs):
warn(
'BaseCache.delete_urls() is deprecated; please use .delete(urls=...) instead',
DeprecationWarning,
)
self.delete(requests=[Request(method, url, **kwargs) for url in urls])
[docs] def has_key(self, key: str) -> bool:
warn(
'BaseCache.has_key() is deprecated; please use .contains() instead',
DeprecationWarning,
)
return self.contains(key)
[docs] def has_url(self, url: str, method: str = 'GET', **kwargs) -> bool:
warn(
'BaseCache.has_url() is deprecated; please use .contains(url=...) instead',
DeprecationWarning,
)
return self.contains(request=Request(method, url, **kwargs))
[docs] def keys(self, check_expiry: bool = False) -> Iterator[str]:
warn(
'BaseCache.keys() is deprecated; '
'please use .filter() or BaseCache.responses.keys() instead',
DeprecationWarning,
)
yield from self.redirects.keys()
if not check_expiry:
yield from self.responses.keys()
else:
for response in self.filter(expired=False):
yield response.cache_key
[docs] def response_count(self, check_expiry: bool = False) -> int:
warn(
'BaseCache.response_count() is deprecated; '
'please use .filter() or len(BaseCache.responses) instead',
DeprecationWarning,
)
return len(list(self.filter(expired=not check_expiry)))
[docs] def remove_expired_responses(self, expire_after: ExpirationTime = None):
warn(
'BaseCache.remove_expired_responses() is deprecated; '
'please use .delete(expired=True) instead',
DeprecationWarning,
)
if expire_after:
self.reset_expiration(expire_after)
self.delete(expired=True, invalid=True)
[docs] def values(self, check_expiry: bool = False) -> Iterator[CachedResponse]:
warn(
'BaseCache.values() is deprecated; '
'please use .filter() or BaseCache.responses.values() instead',
DeprecationWarning,
)
yield from self.filter(expired=not check_expiry)
KT = TypeVar('KT')
VT = TypeVar('VT')
[docs]class BaseStorage(MutableMapping[KT, VT], ABC):
"""Base class for client-agnostic storage implementations. Notes:
* This provides a common dictionary-like interface for the underlying storage operations
(create, read, update, delete).
* One ``BaseStorage`` instance corresponds to a single table/hash/collection, or whatever the
backend-specific equivalent may be.
* ``BaseStorage`` subclasses contain no behavior specific to ``requests``, which are handled by
:py:class:`.BaseCache` subclasses.
* ``BaseStorage`` also contains a serializer object (defaulting to :py:mod:`pickle`), which
determines how :py:class:`.CachedResponse` objects are saved internally. See :ref:`serializers`
for details.
Args:
serializer: Custom serializer that provides ``loads`` and ``dumps`` methods.
If not provided, values will be written as-is.
decode_content: Decode response body JSON or text into a human-readable format
kwargs: Additional backend-specific keyword arguments
"""
def __init__(
self, serializer: Optional[SerializerType] = None, decode_content: bool = False, **kwargs
):
self.serializer = init_serializer(serializer, decode_content)
logger.debug(f'Initialized {type(self).__name__} with serializer: {self.serializer}')
[docs] def bulk_delete(self, keys: Iterable[KT]):
"""Delete multiple keys from the cache, without raising errors for missing keys.
This is a naive, generic implementation that subclasses should override with a more
efficient backend-specific implementation, if possible.
"""
for k in keys:
try:
del self[k]
except KeyError:
pass
[docs] def close(self):
"""Close any open backend connections"""
[docs] def serialize(self, value: VT):
"""Serialize a value, if a serializer is available"""
if TYPE_CHECKING:
assert hasattr(self.serializer, 'dumps')
return self.serializer.dumps(value) if self.serializer else value
[docs] def deserialize(self, key, value: VT):
"""Deserialize a value, if a serializer is available.
If deserialization fails (usually due to a value saved in an older requests-cache version),
``None`` will be returned.
"""
if not self.serializer:
return value
if TYPE_CHECKING:
assert hasattr(self.serializer, 'loads')
try:
obj = self.serializer.loads(value)
# Set cache key, if it's a response object
try:
obj.cache_key = key
except AttributeError:
pass
return obj
except DESERIALIZE_ERRORS as e:
logger.error(f'Unable to deserialize response: {str(e)}')
logger.debug(e, exc_info=True)
return None
def __str__(self):
return str(list(self.keys()))
[docs]class DictStorage(UserDict, BaseStorage):
"""A basic dict wrapper class for non-persistent, in-memory storage
.. note::
This is mostly a placeholder for when no other backends are available. For in-memory
caching, either :py:class:`.SQLiteCache` (with `use_memory=True`) or :py:class:`.RedisCache`
is recommended instead.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.serializer = None
def __getitem__(self, key):
"""An additional step is needed here for response data. The original response object
is still in memory, and hasn't gone through a serialize/deserialize loop. So, the file-like
response body has already been read, and needs to be reset.
"""
item = super().__getitem__(key)
if getattr(item, 'raw', None):
item.raw.reset()
try:
item.cache_key = key
except AttributeError:
pass
return item