Examples#

This section contains some complete examples that demonstrate the main features of requests-cache.

Articles#

Some articles and blog posts that discuss requests-cache:

Scripts#

The following scripts can also be found in the examples/ folder on GitHub.

Basic usage (with sessions)#

A simple example using requests-cache with httpbin

Example

basic_sessions.py

import time

from requests_cache import CachedSession


def main():
    session = CachedSession('example_cache', backend='sqlite')

    # The real request will only be made once; afterward, the cached response is used
    for i in range(5):
        response = session.get('https://httpbin.org/get')

    # This is more obvious when calling a slow endpoint
    for i in range(5):
        response = session.get('https://httpbin.org/delay/2')

    # Caching can be disabled if we want to get a fresh page and not cache it
    with session.cache_disabled():
        print(session.get('https://httpbin.org/ip').text)

    # Get some debugging info about the cache
    print(session.cache)
    print('Cached URLS:')
    print('\n'.join(session.cache.urls()))


if __name__ == '__main__':
    t = time.time()
    main()
    print('Elapsed: %.3f seconds' % (time.time() - t))

Basic usage (with patching)#

The same as basic_sessions.py, but using Patching

Example

basic_patching.py

import time

import requests

import requests_cache

# After installation, all requests functions and Session methods will be cached
requests_cache.install_cache('example_cache', backend='sqlite')


def main():
    # The real request will only be made once; afterward, the cached response is used
    for i in range(5):
        response = requests.get('https://httpbin.org/get')

    # This is more obvious when calling a slow endpoint
    for i in range(5):
        response = requests.get('https://httpbin.org/delay/2')

    # Caching can be disabled if we want to get a fresh page and not cache it
    with requests_cache.disabled():
        print(requests.get('https://httpbin.org/ip').text)

    # Get some debugging info about the cache
    print(requests_cache.get_cache())
    print('Cached URLS:')
    print('\n'.join(requests_cache.get_cache().urls()))

    # Uninstall to remove caching from all requests functions
    requests_cache.uninstall_cache()


if __name__ == "__main__":
    t = time.time()
    main()
    print('Elapsed: %.3f seconds' % (time.time() - t))

Cache expiration#

An example of setting expiration for individual requests

Example

expiration.py

import time

from requests_cache import CachedSession


def main():
    session = CachedSession('example_cache', backend='sqlite')

    # By default, cached responses never expire
    response = session.get('https://httpbin.org/get')
    assert not response.from_cache
    response = session.get('https://httpbin.org/get')
    assert response.from_cache
    assert not response.expires

    # We can set default expiration for the session using expire_after
    session = CachedSession('example_cache', backend='sqlite', expire_after=60)
    session.cache.clear()
    response = session.get('https://httpbin.org/get')
    response = session.get('https://httpbin.org/get')
    print('Expiration time:', response.expires)

    # This can also be overridden for individual requests
    session.cache.clear()
    response = session.get('https://httpbin.org/get', expire_after=1)
    response = session.get('https://httpbin.org/get')
    assert response.from_cache
    print('Expiration time:', response.expires)

    # After 1 second, the cached value will expired
    time.sleep(1.2)
    assert response.is_expired
    response = session.get('https://httpbin.org/get')
    assert not response.from_cache


if __name__ == '__main__':
    t = time.perf_counter()
    main()
    print('Elapsed: %.3f seconds' % (time.perf_counter() - t))

URL patterns#

An example of url-patterns

Example

url_patterns.py

from datetime import timedelta

from requests_cache import DO_NOT_CACHE, NEVER_EXPIRE, CachedSession

default_expire_after = 60 * 60               # By default, cached responses expire in an hour
urls_expire_after = {
    'httpbin.org/image': timedelta(days=7),  # Requests for this base URL will expire in a week
    '*.fillmurray.com': NEVER_EXPIRE,        # Requests matching this pattern will never expire
    '*.placeholder.com/*': DO_NOT_CACHE,     # Requests matching this pattern will not be cached
}
urls = [
    'https://httpbin.org/get',               # Will expire in an hour
    'https://httpbin.org/image/jpeg',        # Will expire in a week
    'https://www.fillmurray.com/460/300',    # Will never expire
    'https://via.placeholder.com/350x150',   # Will not be cached
]


def send_requests():
    session = CachedSession(
        cache_name='example_cache',
        expire_after=default_expire_after,
        urls_expire_after=urls_expire_after,
    )
    return [session.get(url) for url in urls]


def _expires_str(response):
    if not response.from_cache:
        return 'N/A'
    elif response.expires is None:
        return 'Never'
    else:
        return response.expires.isoformat()


def main():
    send_requests()
    cached_responses = send_requests()
    for response in cached_responses:
        print(
            f'{response.url:40} From cache: {response.from_cache:}'
            f'\tExpires: {_expires_str(response)}'
        )


if __name__ == '__main__':
    main()

PyGithub#

An example of caching GitHub API requests with PyGithub.

This example demonstrates the following features:

  • Patching: PyGithub uses requests, but the session it uses is not easily accessible. In this case, using install_cache() is the easiest approach.

  • URL Patterns: Since we’re using patching, this example adds an optional safety measure to avoid unintentionally caching any non-Github requests elsewhere in your code.

  • Cache-Control: The GitHub API provides Cache-Control headers, so we can use those to set expiration.

  • Conditional Requests: The GitHub API also supports conditional requests. Even after responses expire, we can still make use of the cache until the remote content actually changes.

  • Rate limiting: The GitHub API is rate-limited at 5000 requests per hour if authenticated, or only 60 requests per hour otherwise. This makes caching especially useful, because cache hits and 304 Not Modified responses (from conditional requests) are not counted against the rate limit.

  • Cache Inspection: After calling some PyGithub functions, we can take a look at the cache contents to see the actual API requests that were sent.

  • Security: If you use a personal access token, it will be sent to the GitHub API via the Authorization header. This is not something you want to store in the cache if your storage backend is unsecured, so Authorization and other common auth headers/params are redacted by default. This example shows how to verify this.

Example

pygithub.py

from time import time

import requests
from github import Github

from requests_cache import DO_NOT_CACHE, get_cache, install_cache

# (Optional) Add an access token here, if you want higher rate limits and access to private repos
ACCESS_TOKEN = None

# Or add your own username here (if not using an access token)
MY_USERNAME = 'test-user'


install_cache(
    cache_control=True,
    urls_expire_after={
        '*.github.com': 360,  # Placeholder expiration; should be overridden by Cache-Control
        '*': DO_NOT_CACHE,  # Don't cache anything other than GitHub requests
    },
)


def get_user_info():
    """Display some info about your own resources on GitHub"""
    gh = Github(ACCESS_TOKEN)
    my_user = gh.get_user() if ACCESS_TOKEN else gh.get_user(MY_USERNAME)

    # Get links to all of your own repositories
    print('My repos:')
    for repo in my_user.get_repos():
        print(repo.html_url)

    # Get links to all of your own gists
    print('\nMy gists:')
    for gist in my_user.get_gists():
        print(gist.html_url)

    # Get organizations you belong to
    print('\nMy organizations:')
    for org in my_user.get_orgs():
        print(org.html_url)

    # Check how internet-famous you are
    print('\nMy followers:')
    for user in my_user.get_followers():
        print(user.login)

    # Check your API rate limit usage
    print(f'\nRate limit: {gh.rate_limiting}')


def test_non_github_requests():
    """Test that URL patterns are working, and that non-GitHub requests are not cached"""
    response = requests.get('https://httpbin.org/json')
    response = requests.get('https://httpbin.org/json')
    from_cache = getattr(response, 'from_cache', False)
    print(f'Non-GitHub requests cached: {from_cache}')
    assert not from_cache


def check_cache():
    """Check some information on cached requests"""
    # Show all the GitHub API URLs that PyGithub called
    print('\nCached URLs:')
    print('\n'.join(get_cache().urls()))

    # Make sure credentials were redacted from all responses in the cache
    response = requests.get('https://api.github.com/user/repos')
    print('\nExample cached request headers:')
    print(response.request.headers)
    for response in get_cache().responses.values():
        assert 'Authorization' not in response.request.headers


def main():
    # Send initial requests
    start = time()
    get_user_info()
    print(f'Elapsed: {time() - start:.2f} seconds')

    # Repeat the same requests and verify that your rate limit usage is unchanged
    start = time()
    get_user_info()
    print(f'Elapsed: {time() - start:.2f} seconds')

    test_non_github_requests()
    check_cache()


if __name__ == '__main__':
    main()

Multi-threaded requests#

An example of making multi-threaded cached requests, adapted from the python docs for ThreadPoolExecutor.

Example

threads.py

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import perf_counter as time

from requests_cache import CachedSession

URLS = [
    'https://en.wikipedia.org/wiki/Python_(programming_language)',
    'https://en.wikipedia.org/wiki/Requests_(software)',
    'https://en.wikipedia.org/wiki/Cache_(computing)',
    'https://en.wikipedia.org/wiki/SQLite',
    'https://en.wikipedia.org/wiki/Redis',
    'https://en.wikipedia.org/wiki/MongoDB',
]


def send_requests():
    session = CachedSession('example_cache')
    start = time()

    with ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(session.get, url): url for url in URLS}

        for future in as_completed(future_to_url):
            url = future_to_url[future]
            response = future.result()
            from_cache = 'hit' if response.from_cache else 'miss'
            print(f'{url} is {len(response.content)} bytes (cache {from_cache})')

    print(f'Elapsed: {time() - start:.3f} seconds')


if __name__ == '__main__':
    send_requests()
    send_requests()

Logging requests#

An example of testing the cache to prove that it’s not making more requests than expected.

Example

log_requests.py

from contextlib import contextmanager
from logging import basicConfig, getLogger
from unittest.mock import patch

import requests

from requests_cache import CachedSession
from requests_cache.session import OriginalSession, set_response_defaults

basicConfig(level='INFO')
logger = getLogger('requests_cache.examples')
# Uncomment for more verbose debug output
# getLogger('requests_cache').setLevel('DEBUG')


@contextmanager
def log_requests():
    """Context manager that mocks and logs all non-cached requests"""
    real_response = set_response_defaults(requests.get('https://httpbin.org/get'))
    with patch.object(OriginalSession, 'send', return_value=real_response) as mock_send:
        session = CachedSession('cache-test', backend='sqlite')
        session.cache.clear()
        yield session
        cached_responses = session.cache.responses.values()

    logger.debug('All calls to Session._request():')
    logger.debug(mock_send.mock_calls)

    logger.info(f'Responses cached: {len(cached_responses)}')
    logger.info(f'Requests sent: {mock_send.call_count}')


def main():
    """Example usage; replace with any other requests you want to test"""
    with log_requests() as session:
        for i in range(10):
            response = session.get('https://httpbin.org/get')
            logger.debug(f'Response {i}: {type(response).__name__}')


if __name__ == '__main__':
    main()

External configuration#

An example of loading CachedSession settings from an external config file.

Limitations:

  • Does not include backend or serializer settings

  • Does not include settings specified as python expressions, for example timedelta objects or callback functions

Example

external_config.py

from pathlib import Path

import yaml

from requests_cache import CachedSession, CacheSettings

CONFIG_FILE = Path(__file__).parent / 'external_config.yml'


def load_settings() -> CacheSettings:
    """Load settings from a YAML config file"""
    with open(CONFIG_FILE) as f:
        settings = yaml.safe_load(f)
    return CacheSettings(**settings['cache_settings'])


if __name__ == '__main__':
    session = CachedSession()
    session.settings = load_settings()
    print('Loaded settings:\n', session.settings)

Cache speed test#

An example of benchmarking cache write speeds with semi-randomized response content

Usage (optionally for a specific backend and/or serializer):

python benchmark.py -b <backend> -s <serializer>
Example

benchmark.py

from argparse import ArgumentParser
from os import urandom
from random import random
from time import perf_counter as time

import requests
from rich import print
from rich.progress import Progress

from requests_cache import CachedResponse, CachedSession

BASE_RESPONSE = requests.get('https://httpbin.org/get')
CACHE_NAME = 'rubbish_bin'
WARMUP_ITERATIONS = 100
ITERATIONS = 5000
MAX_RESPONSE_SIZE = 1024 * 350

# Defaults for DynamoDB
AWS_OPTIONS = {
    'endpoint_url': 'http://localhost:8000',
    'region_name': 'us-east-1',
    'aws_access_key_id': 'placeholder',
    'aws_secret_access_key': 'placeholder',
}


def test_write_speed(session, max_size):
    for i in range(WARMUP_ITERATIONS):
        new_response = get_randomized_response(i, max_size)
        session.cache.save_response(new_response)

    with Progress() as progress:
        task = progress.add_task('[cyan]Testing write speed...', total=ITERATIONS)
        start = time()

        for i in range(ITERATIONS):
            new_response = get_randomized_response(i, max_size)
            session.cache.save_response(new_response)
            progress.update(task, advance=1)

    elapsed = time() - start
    avg = (elapsed / ITERATIONS) * 1000
    print(f'[cyan]Elapsed: [green]{elapsed:.3f}[/] seconds (avg [green]{avg:.3f}[/] ms per write)')


def test_read_speed(session):
    keys = list(session.cache.responses.keys())
    for i in range(WARMUP_ITERATIONS):
        key = keys[i % len(keys)]
        session.cache.get_response(key)

    with Progress() as progress:
        task = progress.add_task('[cyan]Testing read speed...', total=ITERATIONS)
        start = time()

        for i in range(ITERATIONS):
            key = keys[i % len(keys)]
            session.cache.get_response(key)
            progress.update(task, advance=1)

    elapsed = time() - start
    avg = (elapsed / ITERATIONS) * 1000
    print(f'[cyan]Elapsed: [green]{elapsed:.3f}[/] seconds (avg [green]{avg:.3f}[/] ms per read)')


def get_randomized_response(i=0, max_size=MAX_RESPONSE_SIZE):
    """Get a response with randomized content"""
    new_response = CachedResponse.from_response(BASE_RESPONSE)
    n_bytes = int(random() * max_size)
    new_response._content = urandom(n_bytes)
    new_response.request.url += f'/response_{i}'
    return new_response


if __name__ == '__main__':
    parser = ArgumentParser()
    parser.add_argument('-b', '--backend', default='sqlite')
    parser.add_argument('-s', '--serializer', default='pickle')
    parser.add_argument('-m', '--max-size', default=MAX_RESPONSE_SIZE, type=float)
    args = parser.parse_args()
    print(f'[cyan]Benchmarking {args.backend} backend with {args.serializer} serializer')

    kwargs = {}
    if args.backend == 'dynamodb':
        kwargs = AWS_OPTIONS
    elif args.backend == 'sqlite-memory':
        args.backend = 'sqlite'
        kwargs = {'use_memory': True}

    session = CachedSession(
        CACHE_NAME,
        backend=args.backend,
        serializer=args.serializer,
        **kwargs,
    )
    test_write_speed(session, args.max_size)
    test_read_speed(session)

Requests per second graph#

This example displays a graph of request rates over time. Requests are continuously sent to URLs randomly picked from a fixed number of possible URLs. This demonstrates how average request rate increases as the proportion of cached requests increases.

Try running this example with different cache settings and URLs to see how the graph changes.

Example

benchmark.py

from random import randint
from time import time

from rich.live import Live
from rich.progress import BarColumn, MofNCompleteColumn, Progress
from rich.table import Table

from requests_cache import CachedSession

N_UNIQUE_REQUESTS = 200


class RPSProgress(Progress):
    """Display a bar chart of requests per second"""

    def __init__(self, interval: int = 1, scale: int = 500, **kwargs):
        super().__init__(BarColumn(), '{task.completed}', **kwargs)
        self.current_task = None
        self.interval = interval
        self.interval_start = None
        self.scale = scale
        self.total_requests = 0
        self.next_interval()

    def next_interval(self):
        """Create a new task to draw the next line on the bar chart"""
        self.current_task = self.add_task('barchart_line', total=self.scale)
        self.interval_start = time()

    def count_request(self):
        if time() - self.interval_start >= self.interval:
            self.next_interval()
        self.advance(self.current_task)
        self.total_requests += 1


class CacheRPSProgress:
    """Track requests per second plus cache size in a single live view"""

    def __init__(self, n_unique_requests: int = 100):
        self.rps_progress = RPSProgress()
        self.cache_progress = Progress(
            BarColumn(complete_style='blue'),
            '[cyan]Requests cached:',
            MofNCompleteColumn(),
        )
        header = Progress(BarColumn(), '[cyan]Requests per second')
        header.add_task('')
        self.cache_task = self.cache_progress.add_task('', total=n_unique_requests)
        self.n_unique_requests = n_unique_requests
        self.start_time = time()

        self.table = Table.grid()
        self.table.add_row(header)
        self.table.add_row(self.rps_progress)
        self.table.add_row(self.cache_progress)
        self.live = Live(self.table, refresh_per_second=10)

    def __enter__(self):
        """Start live view on ctx enter"""
        self.live.__enter__()
        self.log(
            '[cyan]Measuring request rate with '
            f'[white]{self.n_unique_requests}[cyan] total unique requests'
        )
        self.log('[cyan]Press [white]Ctrl+C[cyan] to exit')
        return self

    def __exit__(self, *args):
        """Show stats on ctx exit"""
        self.live.__exit__(*args)
        elapsed = time() - self.start_time
        self.log(
            f'[cyan]Sent a total of [white]{self.total_requests}[cyan] '
            f'requests in [white]{elapsed:.2f}[cyan] seconds '
        )

        self.log(f'[cyan]Average: [white]{int(self.total_requests/elapsed)}[cyan] requests/second')

    @property
    def total_requests(self):
        return self.rps_progress.total_requests

    def count_request(self):
        self.rps_progress.count_request()

    def update_cache_size(self, size: int):
        self.cache_progress.update(self.cache_task, completed=size)

    def log(self, msg: str):
        self.cache_progress.log(msg)


def test_rps(session):
    session.cache.clear()

    # Send a request to one of a fixed number of unique URLs
    def random_request():
        request_number = randint(1, N_UNIQUE_REQUESTS)
        session.get(f'https://httpbin.org/get?page={request_number}')

    # Show request rate over time and total cached (unexpired) requests
    with CacheRPSProgress(N_UNIQUE_REQUESTS) as progress:
        while True:
            try:
                random_request()
                progress.count_request()
                progress.update_cache_size(session.cache.responses.count(expired=False))
            except KeyboardInterrupt:
                break


if __name__ == '__main__':
    session = CachedSession(use_temp=True, expire_after=30)
    test_rps(session)
Screenshot

Using with GitHub Actions#

This example shows how to use requests-cache with GitHub Actions. Key points:

  • Create the cache file within the CI project directory

  • You can use actions/cache to persist the cache file across workflow runs

    • You can use a constant cache key within this action to let requests-cache handle expiration

Example

github_actions.yml

name: Test requests-cache with GitHub Actions

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      # Persist the SQLite file created by requests-cache across workflow runs
      - id: cache
        uses: actions/cache@v3
        with:
          path: example_cache.sqlite
          key: none

      # Install and run basic requests-cache example
      - run: pip install '.'
      - run: python examples/basic_sessions.py
      - run: python examples/basic_sessions.py
      - run: test -f example_cache.sqlite && echo 'Cache file created' || echo 'Cache file missing'

Converting an old cache#

Example of converting data cached in older versions of requests-cache (<=0.5.2) into the current format

Example

convert_cache.py

from requests import Response

import requests_cache.backends.base
from requests_cache import CachedResponse, CachedSession


class _Store:
    pass


# Add placeholder object used by pickle to deserialize old responses
requests_cache.backends.base._Store = _Store
requests_cache.backends.base._RawStore = _Store


def convert_old_response(cached_response, timestamp):
    temp_response = Response()
    for field in Response.__attrs__:
        setattr(temp_response, field, getattr(cached_response, field, None))

    new_response = CachedResponse(temp_response)
    new_response.created_at = timestamp
    return new_response


def convert_cache(*args, **kwargs):
    session = CachedSession(*args, **kwargs)
    print(f'Checking {len(session.cache.responses)} cached responses')

    with session.cache.responses.bulk_commit():
        for key, response in session.cache.responses.items():
            if isinstance(response, tuple):
                print(f'Converting response {key}')
                session.cache.responses[key] = convert_old_response(*response)

    print('Conversion complete')


# Example: convert a cache named 'demo_cache.sqlite' in the current directory
if __name__ == '__main__':
    convert_cache('demo_cache.sqlite')

Custom request matcher#

Example of a custom request matcher that caches a new response if the version of requests-cache, requests, or urllib3 changes.

This generally isn’t needed, since anything that causes a deserialization error will simply result in a new request being sent and cached. But you might want to include a library version in your cache key if, for example, you suspect a change in the library does not cause errors but results in different response content.

This uses info from requests.help.info(). You can also preview this info from the command line to see what else is available:

python -m requests.help
Example

custom_request_matcher.py

from hashlib import sha256
from unittest.mock import patch

from requests import PreparedRequest
from requests.help import info as get_requests_info

import requests_cache
from requests_cache import CachedSession
from requests_cache.cache_keys import create_key


def create_custom_key(request: PreparedRequest, **kwargs) -> str:
    """Make a custom cache key that includes library versions"""
    # Start with the default key created by requests-cache
    base_key = create_key(request, **kwargs)
    key = sha256()
    key.update(base_key.encode('utf-8'))

    # Add versions of requests-cache, requests, and urllib3
    requests_info = get_requests_info()
    for lib in ['requests', 'urllib3']:
        key.update(requests_info[lib]['version'].encode('utf-8'))
    key.update(requests_cache.__version__.encode('utf-8'))

    return key.hexdigest()


def test_cache_key():
    """Test that the custom cache keys are working as expected"""
    session = CachedSession('key-test', key_fn=create_custom_key)
    session.cache.clear()
    session.get('https://httpbin.org/get')
    response = session.get('https://httpbin.org/get')
    assert response.from_cache is True

    # Simulate a major version change
    new_versions = {
        'requests': {'version': '3.0.0'},
        'urllib3': {'version': '2.0.0'},
    }
    with patch('__main__.get_requests_info', return_value=new_versions):
        # A new request will be sent since the cache key no longer matches
        response = session.get('https://httpbin.org/get')
        assert response.from_cache is False


if __name__ == '__main__':
    test_cache_key()

Backtesting with time-machine#

An example of using the time-machine library for backtesting, e.g., testing with cached responses that were available at an arbitrary time in the past.

Example

time_machine_backtesting.py

from datetime import datetime

import requests
import time_machine

from requests_cache import CachedSession, set_response_defaults


class BacktestCachedSession(CachedSession):
    def request(self, method: str, url: str, **kwargs):
        response = super().request(method, url, **kwargs)

        # Response was cached after the (simulated) current time, so ignore it and send a new request
        if response.created_at and response.created_at > datetime.utcnow():
            new_response = requests.request(method, url, **kwargs)
            return set_response_defaults(new_response)
        else:
            return response


def demo():
    session = BacktestCachedSession()
    response = session.get('https://httpbin.org/get')
    response = session.get('https://httpbin.org/get')
    assert response.from_cache is True

    # Response was not cached yet at this point, so we should get a fresh one
    with time_machine.travel(datetime(2020, 1, 1)):
        response = session.get('https://httpbin.org/get')
        assert response.from_cache is False


if __name__ == '__main__':
    demo()

VCR Export#

Example utilities to export responses to a format compatible with VCR-based libraries, including:

Example

vcr.py

from os import makedirs
from os.path import abspath, dirname, expanduser, join
from typing import Any, Dict, Iterable
from urllib.parse import urlparse

import yaml

from requests_cache import BaseCache, CachedResponse, CachedSession, __version__
from requests_cache.serializers.preconf import yaml_preconf_stage


def to_vcr_cassette(cache: BaseCache, path: str):
    """Export cached responses to a VCR-compatible YAML file (cassette)

    Args:
        cache: Cache instance containing response data to export
        path: Path for new cassette file
    """

    responses = cache.responses.values()
    write_cassette(to_vcr_cassette_dict(responses), path)


def to_vcr_cassettes_by_host(cache: BaseCache, cassette_dir: str = '.'):
    """Export cached responses as VCR-compatible YAML files (cassettes), split into separate files
    based on request host

    Args:
        cache: Cache instance containing response data to export
        cassette_dir: Base directory for cassette library
    """
    responses = cache.responses.values()
    for host, cassette in to_vcr_cassette_dicts_by_host(responses).items():
        write_cassette(cassette, join(cassette_dir, f'{host}.yml'))


def to_vcr_cassette_dict(responses: Iterable[CachedResponse]) -> Dict:
    """Convert responses to a VCR cassette dict"""
    return {
        'http_interactions': [to_vcr_episode(r) for r in responses],
        'recorded_with': f'requests-cache {__version__}',
    }


def to_vcr_episode(response: CachedResponse) -> Dict:
    """Convert a single response to a VCR-compatible response ("episode") dict"""
    # Do most of the work with cattrs + default YAML conversions
    response_dict = yaml_preconf_stage.dumps(response)

    def _to_multidict(d):
        return {k: [v] for k, v in d.items()}

    # Translate requests.Response structure into VCR format
    return {
        'request': {
            'body': response_dict['request']['body'],
            'headers': _to_multidict(response_dict['request']['headers']),
            'method': response_dict['request']['method'],
            'uri': response_dict['request']['url'],
        },
        'response': {
            'body': {'string': response_dict['_content'], 'encoding': response_dict['encoding']},
            'headers': _to_multidict(response_dict['headers']),
            'status': {'code': response_dict['status_code'], 'message': response_dict['reason']},
            'url': response_dict['url'],
        },
        'recorded_at': response_dict['created_at'],
    }


def to_vcr_cassette_dicts_by_host(responses: Iterable[CachedResponse]) -> Dict[str, Dict]:
    responses_by_host: Dict[str, Any] = {}
    for response in responses:
        host = urlparse(response.request.url).netloc
        responses_by_host.setdefault(host, [])
        responses_by_host[host].append(response)
    return {host: to_vcr_cassette_dict(responses) for host, responses in responses_by_host.items()}


def write_cassette(cassette, path):
    path = abspath(expanduser(path))
    makedirs(dirname(path), exist_ok=True)
    with open(path, 'w') as f:
        f.write(yaml.safe_dump(cassette))


# Create an example cache and export it to a cassette
if __name__ == '__main__':
    cache_dir = 'example_cache'
    session = CachedSession(join(cache_dir, 'http_cache.sqlite'))
    session.get('https://httpbin.org/get')
    session.get('https://httpbin.org/json')
    to_vcr_cassette(session.cache, join(cache_dir, 'http_cache.yaml'))