Source code for ZODB.DemoStorage

##############################################################################
#
# Copyright (c) Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Demo ZODB storage

A demo storage supports demos by allowing a volatile changed database
to be layered over a base database.

The base storage must not change.

"""
from __future__ import print_function
import os
import random
import weakref
import tempfile
import ZODB.BaseStorage
import ZODB.blob
import ZODB.interfaces
import ZODB.MappingStorage
import ZODB.POSException
import ZODB.utils
import zope.interface

from .ConflictResolution import ConflictResolvingStorage
from .utils import load_current, maxtid


[docs]@zope.interface.implementer( ZODB.interfaces.IStorage, ZODB.interfaces.IStorageIteration, ) class DemoStorage(ConflictResolvingStorage): """A storage that stores changes against a read-only base database This storage was originally meant to support distribution of application demonstrations with populated read-only databases (on CDROM) and writable in-memory databases. Demo storages are extemely convenient for testing where setup of a base database can be shared by many tests. Demo storages are also handy for staging appplications where a read-only snapshot of a production database (often accomplished using a `beforestorage <https://pypi.org/project/zc.beforestorage/>`_) is combined with a changes database implemented with a :class:`~ZODB.FileStorage.FileStorage.FileStorage`. """
[docs] def __init__(self, name=None, base=None, changes=None, close_base_on_close=None, close_changes_on_close=None): """Create a demo storage :param str name: The storage name used by the :meth:`~ZODB.interfaces.IStorage.getName` and :meth:`~ZODB.interfaces.IStorage.sortKey` methods. :param object base: base storage :param object changes: changes storage :param bool close_base_on_close: A Flag indicating whether the base database should be closed when the demo storage is closed. :param bool close_changes_on_close: A Flag indicating whether the changes database should be closed when the demo storage is closed. If a base database isn't provided, a :class:`~ZODB.MappingStorage.MappingStorage` will be constructed and used. If ``close_base_on_close`` isn't specified, it will be ``True`` if a base database was provided and ``False`` otherwise. If a changes database isn't provided, a :class:`~ZODB.MappingStorage.MappingStorage` will be constructed and used and blob support will be provided using a temporary blob directory. If ``close_changes_on_close`` isn't specified, it will be ``True`` if a changes database was provided and ``False`` otherwise. """ if close_base_on_close is None: if base is None: base = ZODB.MappingStorage.MappingStorage() close_base_on_close = False else: close_base_on_close = True elif base is None: base = ZODB.MappingStorage.MappingStorage() self.base = base self.close_base_on_close = close_base_on_close if changes is None: self._temporary_changes = True changes = ZODB.MappingStorage.MappingStorage() zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage) if close_changes_on_close is None: close_changes_on_close = False else: if ZODB.interfaces.IBlobStorage.providedBy(changes): zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage) if close_changes_on_close is None: close_changes_on_close = True self.changes = changes self.close_changes_on_close = close_changes_on_close self._issued_oids = set() self._stored_oids = set() self._resolved = [] self._commit_lock = ZODB.utils.Lock() self._transaction = None if name is None: name = 'DemoStorage(%r, %r)' % (base.getName(), changes.getName()) self.__name__ = name self._copy_methods_from_changes(changes) self._next_oid = random.randint(1, 1 << 62)
def _blobify(self): if (self._temporary_changes and isinstance(self.changes, ZODB.MappingStorage.MappingStorage)): blob_dir = tempfile.mkdtemp('.demoblobs') _temporary_blobdirs[ weakref.ref(self, cleanup_temporary_blobdir) ] = blob_dir self.changes = ZODB.blob.BlobStorage(blob_dir, self.changes) self._copy_methods_from_changes(self.changes) return True def cleanup(self): self.base.cleanup() self.changes.cleanup() __opened = True def opened(self): return self.__opened def close(self): self.__opened = False if self.close_base_on_close: self.base.close() if self.close_changes_on_close: self.changes.close() def _copy_methods_from_changes(self, changes): for meth in ( '_lock', 'getSize', 'isReadOnly', 'sortKey', 'tpc_transaction', ): setattr(self, meth, getattr(changes, meth)) supportsUndo = getattr(changes, 'supportsUndo', None) if supportsUndo is not None and supportsUndo(): for meth in ('supportsUndo', 'undo', 'undoLog', 'undoInfo'): setattr(self, meth, getattr(changes, meth)) zope.interface.alsoProvides(self, ZODB.interfaces.IStorageUndoable) lastInvalidations = getattr(changes, 'lastInvalidations', None) if lastInvalidations is not None: self.lastInvalidations = lastInvalidations def getName(self): return self.__name__ __repr__ = getName def getTid(self, oid): try: return self.changes.getTid(oid) except ZODB.POSException.POSKeyError: return self.base.getTid(oid) def history(self, oid, size=1): try: r = self.changes.history(oid, size) except ZODB.POSException.POSKeyError: r = [] size -= len(r) if size: try: r += self.base.history(oid, size) except ZODB.POSException.POSKeyError: if not r: raise return r def iterator(self, start=None, end=None): for t in self.base.iterator(start, end): yield t for t in self.changes.iterator(start, end): yield t def lastTransaction(self): t = self.changes.lastTransaction() if t == ZODB.utils.z64: t = self.base.lastTransaction() return t def __len__(self): return len(self.changes) # still want load for old clients (e.g. zeo servers) load = load_current def loadBefore(self, oid, tid): try: result = self.changes.loadBefore(oid, tid) except ZODB.POSException.POSKeyError: # The oid isn't in the changes, so defer to base return self.base.loadBefore(oid, tid) if result is None: # The oid *was* in the changes, but there aren't any # earlier records. Maybe there are in the base. try: result = self.base.loadBefore(oid, tid) except ZODB.POSException.POSKeyError: # The oid isn't in the base, so None will be the right result pass else: if result and not result[-1]: # The oid is current in the base. We need to find # the end tid in the base by fining the first tid # in the changes. Unfortunately, there isn't an # api for this, so we have to walk back using # loadBefore. if tid == maxtid: # Special case: we were looking for the # current value. We won't find anything in # changes, so we're done. return result end_tid = maxtid t = self.changes.loadBefore(oid, end_tid) while t: end_tid = t[1] t = self.changes.loadBefore(oid, end_tid) result = result[:2] + ( end_tid if end_tid != maxtid else None, ) return result def loadBlob(self, oid, serial): try: return self.changes.loadBlob(oid, serial) except ZODB.POSException.POSKeyError: try: return self.base.loadBlob(oid, serial) except AttributeError: if not ZODB.interfaces.IBlobStorage.providedBy(self.base): raise ZODB.POSException.POSKeyError(oid, serial) raise except AttributeError: if self._blobify(): return self.loadBlob(oid, serial) raise def openCommittedBlobFile(self, oid, serial, blob=None): try: return self.changes.openCommittedBlobFile(oid, serial, blob) except ZODB.POSException.POSKeyError: try: return self.base.openCommittedBlobFile(oid, serial, blob) except AttributeError: if not ZODB.interfaces.IBlobStorage.providedBy(self.base): raise ZODB.POSException.POSKeyError(oid, serial) raise except AttributeError: if self._blobify(): return self.openCommittedBlobFile(oid, serial, blob) raise def loadSerial(self, oid, serial): try: return self.changes.loadSerial(oid, serial) except ZODB.POSException.POSKeyError: return self.base.loadSerial(oid, serial) def new_oid(self): with self._lock: while 1: oid = ZODB.utils.p64(self._next_oid) if oid not in self._issued_oids: try: load_current(self.changes, oid) except ZODB.POSException.POSKeyError: try: load_current(self.base, oid) except ZODB.POSException.POSKeyError: self._next_oid += 1 self._issued_oids.add(oid) return oid self._next_oid = random.randint(1, 1 << 62) def pack(self, t, referencesf, gc=None): if gc is None: if self._temporary_changes: return self.changes.pack(t, referencesf) elif self._temporary_changes: return self.changes.pack(t, referencesf, gc=gc) elif gc: raise TypeError( "Garbage collection isn't supported" " when there is a base storage.") try: self.changes.pack(t, referencesf, gc=False) except TypeError as v: if 'gc' in str(v): pass # The gc arg isn't supported. Don't pack raise
[docs] def pop(self): """Close the changes database and return the base. """ self.changes.close() return self.base
[docs] def push(self, changes=None): """Create a new demo storage using the storage as a base. The given changes are used as the changes for the returned storage and ``False`` is passed as ``close_base_on_close``. """ return self.__class__(base=self, changes=changes, close_base_on_close=False)
def store(self, oid, serial, data, version, transaction): assert version == '', "versions aren't supported" if transaction is not self._transaction: raise ZODB.POSException.StorageTransactionError(self, transaction) # Since the OID is being used, we don't have to keep up with it any # more. Save it now so we can forget it later. :) self._stored_oids.add(oid) # See if we already have changes for this oid try: old = load_current(self, oid)[1] except ZODB.POSException.POSKeyError: old = serial if old != serial: rdata = self.tryToResolveConflict(oid, old, serial, data) self.changes.store(oid, old, rdata, '', transaction) self._resolved.append(oid) else: self.changes.store(oid, serial, data, '', transaction) def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction): assert version == '', "versions aren't supported" if transaction is not self._transaction: raise ZODB.POSException.StorageTransactionError(self, transaction) # Since the OID is being used, we don't have to keep up with it any # more. Save it now so we can forget it later. :) self._stored_oids.add(oid) try: self.changes.storeBlob( oid, oldserial, data, blobfilename, '', transaction) except AttributeError: if not self._blobify(): raise self.changes.storeBlob( oid, oldserial, data, blobfilename, '', transaction) checkCurrentSerialInTransaction = ( ZODB.BaseStorage.checkCurrentSerialInTransaction) def temporaryDirectory(self): try: return self.changes.temporaryDirectory() except AttributeError: if self._blobify(): return self.changes.temporaryDirectory() raise def tpc_abort(self, transaction): with self._lock: if transaction is not self._transaction: return self._stored_oids = set() self._transaction = None self.changes.tpc_abort(transaction) self._commit_lock.release() def tpc_begin(self, transaction, *a, **k): with self._lock: # The tid argument exists to support testing. if transaction is self._transaction: raise ZODB.POSException.StorageTransactionError( "Duplicate tpc_begin calls for same transaction") self._commit_lock.acquire() with self._lock: self.changes.tpc_begin(transaction, *a, **k) self._transaction = transaction self._stored_oids = set() del self._resolved[:] def tpc_vote(self, *a, **k): if self.changes.tpc_vote(*a, **k): raise ZODB.POSException.StorageTransactionError( "Unexpected resolved conflicts") return self._resolved def tpc_finish(self, transaction, func=lambda tid: None): with self._lock: if (transaction is not self._transaction): raise ZODB.POSException.StorageTransactionError( "tpc_finish called with wrong transaction") self._issued_oids.difference_update(self._stored_oids) self._stored_oids = set() self._transaction = None tid = self.changes.tpc_finish(transaction, func) self._commit_lock.release() return tid
_temporary_blobdirs = {} def cleanup_temporary_blobdir( ref, _temporary_blobdirs=_temporary_blobdirs, # Make sure it stays around ): blob_dir = _temporary_blobdirs.pop(ref, None) if blob_dir and os.path.exists(blob_dir): ZODB.blob.remove_committed_dir(blob_dir)