Source code for requests_cache.backends.dynamodb

import boto3
from botocore.exceptions import ClientError

from .base import BaseCache, BaseStorage


[docs]class DynamoDbCache(BaseCache): """DynamoDB cache backend Args: table_name: DynamoDb table name namespace: Name of DynamoDb hash map connection: DynamoDb Resource object (``boto3.resource('dynamodb')``) to use instead of creating a new one """ def __init__(self, table_name='http_cache', **kwargs): super().__init__(**kwargs) self.responses = DynamoDbDict(table_name, namespace='responses', **kwargs) kwargs['connection'] = self.responses.connection self.redirects = DynamoDbDict(table_name, namespace='redirects', **kwargs)
[docs]class DynamoDbDict(BaseStorage): """A dictionary-like interface for DynamoDB key-value store **Note:** The actual key name on the dynamodb server will be ``namespace``:``table_name`` In order to deal with how dynamodb stores data/keys, everything, i.e. keys and data, must be pickled. Args: table_name: DynamoDb table name namespace: Name of DynamoDb hash map connection: DynamoDb Resource object (``boto3.resource('dynamodb')``) to use instead of creating a new one endpoint_url: Alternative URL of dynamodb server. """ def __init__( self, table_name, namespace='http_cache', connection=None, endpoint_url=None, region_name='us-east-1', aws_access_key_id=None, aws_secret_access_key=None, read_capacity_units=1, write_capacity_units=1, **kwargs, ): super().__init__(**kwargs) self._self_key = namespace if connection is not None: self.connection = connection else: # TODO: Use inspection to get any valid resource arguments from **kwargs self.connection = boto3.resource( 'dynamodb', endpoint_url=endpoint_url, region_name=region_name, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) try: self.connection.create_table( AttributeDefinitions=[ { 'AttributeName': 'namespace', 'AttributeType': 'S', }, { 'AttributeName': 'key', 'AttributeType': 'S', }, ], TableName=table_name, KeySchema=[ {'AttributeName': 'namespace', 'KeyType': 'HASH'}, {'AttributeName': 'key', 'KeyType': 'RANGE'}, ], ProvisionedThroughput={ 'ReadCapacityUnits': read_capacity_units, 'WriteCapacityUnits': write_capacity_units, }, ) except ClientError: pass self._table = self.connection.Table(table_name) self._table.wait_until_exists() def __getitem__(self, key): composite_key = {'namespace': self._self_key, 'key': str(key)} result = self._table.get_item(Key=composite_key) if 'Item' not in result: raise KeyError return self.deserialize(result['Item']['value'].value) def __setitem__(self, key, item): item = {'namespace': self._self_key, 'key': str(key), 'value': self.serialize(item)} self._table.put_item(Item=item) def __delitem__(self, key): composite_key = {'namespace': self._self_key, 'key': str(key)} response = self._table.delete_item(Key=composite_key, ReturnValues='ALL_OLD') if 'Attributes' not in response: raise KeyError def __len__(self): return self.__count_table() def __iter__(self): response = self.__scan_table() for v in response['Items']: yield self.deserialize(v['value'].value)
[docs] def clear(self): response = self.__scan_table() for v in response['Items']: composite_key = {'namespace': v['namespace'], 'key': v['key']} self._table.delete_item(Key=composite_key)
def __str__(self): return str(dict(self.items())) def __scan_table(self): expression_attribute_values = {':Namespace': self._self_key} expression_attribute_names = {'#N': 'namespace'} key_condition_expression = '#N = :Namespace' return self._table.query( ExpressionAttributeValues=expression_attribute_values, ExpressionAttributeNames=expression_attribute_names, KeyConditionExpression=key_condition_expression, ) def __count_table(self): expression_attribute_values = {':Namespace': self._self_key} expression_attribute_names = {'#N': 'namespace'} key_condition_expression = '#N = :Namespace' return self._table.query( Select='COUNT', ExpressionAttributeValues=expression_attribute_values, ExpressionAttributeNames=expression_attribute_names, KeyConditionExpression=key_condition_expression, )['Count']