From 5ccda3f3e2e9d79e8c16f404cedc5bd05b9a4070 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 26 Jan 2021 12:51:10 -0800 Subject: [PATCH] Feature/synced collection/simplify global buffering (#482) Eliminate the global buffering mode across all backends in favor of a more targeted per-backend approach. * Enable per backend buffering. * Remove global buffering in favor of class-specific buffering. * Reintroduce warnings for deprecated functionality. * Remove truly global buffering and replace it with class-level buffering. * Document new features. --- signac/__init__.py | 7 +- .../buffers/buffered_collection.py | 80 +++---------------- .../buffers/file_buffered_collection.py | 67 +++++++++++++++- tests/test_buffered_mode.py | 25 +++--- .../test_json_buffered_collection.py | 36 +++++---- 5 files changed, 107 insertions(+), 108 deletions(-) diff --git a/signac/__init__.py b/signac/__init__.py index eda6d8aaf..c67980016 100644 --- a/signac/__init__.py +++ b/signac/__init__.py @@ -31,11 +31,12 @@ from .core.jsondict import flush_all as flush from .db import get_database from .diff import diff_jobs -from .synced_collections.backends.collection_json import JSONDict -from .synced_collections.buffers.buffered_collection import buffer_all as buffered -from .synced_collections.buffers.buffered_collection import is_buffered +from .synced_collections.backends.collection_json import BufferedJSONDict, JSONDict from .version import __version__ +buffered = BufferedJSONDict.buffer_backend +is_buffered = BufferedJSONDict.backend_is_buffered + __all__ = [ "__version__", "contrib", diff --git a/signac/synced_collections/buffers/buffered_collection.py b/signac/synced_collections/buffers/buffered_collection.py index 482aae0bf..5b2deb2a3 100644 --- a/signac/synced_collections/buffers/buffered_collection.py +++ b/signac/synced_collections/buffers/buffered_collection.py @@ -36,12 +36,9 @@ """ import logging -import warnings from inspect import isabstract -from typing import Any, List from .. import SyncedCollection -from ..errors import BufferedError from ..utils import _CounterFuncContext logger = logging.getLogger(__name__) @@ -82,8 +79,6 @@ class BufferedCollection(SyncedCollection): """ - _BUFFERED_BACKENDS: List[Any] = [] - def __init__(self, *args, **kwargs): # The `_buffered` attribute _must_ be defined prior to calling the # superclass constructors in order to enable subclasses to override @@ -102,30 +97,17 @@ def __init_subclass__(cls): """ super().__init_subclass__() if not isabstract(cls): - BufferedCollection._BUFFERED_BACKENDS.append(cls) - - @staticmethod - def _flush_all_backends(): - """Execute all deferred write operations. + cls._buffer_context = _CounterFuncContext(cls._flush_buffer) - Raises - ------ - BufferedError - If there are any issues with flushing any backend. + @classmethod + def buffer_backend(cls, *args, **kwargs): + """Enter context to buffer all operations for this backend.""" + return cls._buffer_context - """ - logger.debug("Flushing buffer...") - issues = {} - for backend in BufferedCollection._BUFFERED_BACKENDS: - try: - # try to sync the data to backend - issue = backend._flush_buffer() - issues.update(issue) - except OSError as error: - logger.error(str(error)) - issues[backend] = error - if issues: - raise BufferedError(issues) + @classmethod + def backend_is_buffered(cls): + """Check if this backend is currently buffered.""" + return bool(cls._buffer_context) def _save(self): """Synchronize data with the backend but buffer if needed. @@ -188,7 +170,7 @@ def _load_from_buffer(self): @property def _is_buffered(self): """Check if we should write to the buffer or not.""" - return self.buffered or _BUFFER_ALL_CONTEXT + return self.buffered or type(self)._buffer_context def _flush(self): """Flush data associated with this instance from the buffer.""" @@ -198,45 +180,3 @@ def _flush(self): def _flush_buffer(self): """Flush all data in this class's buffer.""" pass - - -# This module-scope variable is a context that can be accessed via the -# buffer_all method for the purpose of buffering all subsequence read and write -# operations. -_BUFFER_ALL_CONTEXT = _CounterFuncContext(BufferedCollection._flush_all_backends) - - -# This function provides a more familiar module-scope, function-based interface -# for enabling buffering rather than calling the class's static method. -def buffer_all(force_write=None, buffer_size=None): - """Return a global buffer context for all BufferedCollection instances. - - All future operations use the buffer whenever possible. Write operations - are deferred until the context is exited, at which point all buffered - backends will flush their buffers. Individual backends may flush their - buffers within this context if the implementation requires it; this context - manager represents a promise to buffer whenever possible, but does not - guarantee that no writes will occur under all circumstances. - """ - if force_write is not None: - warnings.warn( - DeprecationWarning( - "The force_write parameter is deprecated and will be removed in " - "signac 2.0. This functionality is no longer supported." - ) - ) - if buffer_size is not None: - warnings.warn( - DeprecationWarning( - "The buffer_size parameter is deprecated and will be removed in " - "signac 2.0. The buffer size should be set using the " - "set_buffer_capacity method of FileBufferedCollection or any of its " - "subclasses." - ) - ) - return _BUFFER_ALL_CONTEXT - - -def is_buffered(): - """Check the global buffered mode setting.""" - return bool(_BUFFER_ALL_CONTEXT) diff --git a/signac/synced_collections/buffers/file_buffered_collection.py b/signac/synced_collections/buffers/file_buffered_collection.py index 88c7480fd..870922153 100644 --- a/signac/synced_collections/buffers/file_buffered_collection.py +++ b/signac/synced_collections/buffers/file_buffered_collection.py @@ -11,16 +11,50 @@ import errno import os +import warnings from abc import abstractmethod from threading import RLock from typing import Dict, Tuple, Union from ..data_types.synced_collection import _LoadAndSave -from ..errors import MetadataError -from ..utils import _NullContext +from ..errors import BufferedError, MetadataError +from ..utils import _CounterFuncContext, _NullContext from .buffered_collection import BufferedCollection +class _FileBufferedContext(_CounterFuncContext): + """Extend the usual buffering context to support setting the buffer size. + + This context allows the buffer_backend method to temporarily set the buffer + size within the scope of this context. + """ + + def __init__(self, cls): + super().__init__(cls._flush_buffer) + self._buffer_capacity = None + self._original_buffer_capacitys = [] + self._cls = cls + + def __call__(self, buffer_capacity=None): + self._buffer_capacity = buffer_capacity + return self + + def __enter__(self): + super().__enter__() + if self._buffer_capacity is not None: + self._original_buffer_capacitys.append(self._cls.get_buffer_capacity()) + self._cls.set_buffer_capacity(self._buffer_capacity) + else: + self._original_buffer_capacitys.append(None) + self._buffer_capacity = None + + def __exit__(self, exc_type, exc_val, exc_tb): + super().__exit__(exc_type, exc_val, exc_tb) + original_buffer_capacity = self._original_buffer_capacitys.pop() + if original_buffer_capacity is not None: + self._cls.set_buffer_capacity(original_buffer_capacity) + + class _BufferedLoadAndSave(_LoadAndSave): """Wrap base loading and saving with an extra thread lock. @@ -103,6 +137,8 @@ def __init_subclass__(cls): # buffering state. cls._buffered_collections: Dict[int, BufferedCollection] = {} + cls._buffer_context = _FileBufferedContext(cls) + @classmethod def enable_multithreading(cls): """Allow multithreaded access to and modification of :class:`SyncedCollection`s. @@ -306,4 +342,29 @@ def _flush_buffer(cls, force=False, retain_in_force=False): issues[collection._filename] = err if not issues: cls._buffered_collections = remaining_collections - return issues + else: + raise BufferedError(issues) + + # TODO: The buffer_size argument should be changed to buffer_capacity in + # signac 2.0 for consistency with the new names in synced collections. + @classmethod + def buffer_backend(cls, buffer_size=None, force_write=None, *args, **kwargs): + """Enter context to buffer all operations for this backend. + + Parameters + ---------- + buffer_size : int + The capacity of the buffer to use within this context (resets after + the context is exited). + force_write : bool + This argument does nothing and is only present for compatibility + with signac 1.x. + """ + if force_write is not None: + warnings.warn( + DeprecationWarning( + "The force_write parameter is deprecated and will be removed in " + "signac 2.0. This functionality is no longer supported." + ) + ) + return cls._buffer_context(buffer_size) diff --git a/tests/test_buffered_mode.py b/tests/test_buffered_mode.py index 3d95f6101..257a20708 100644 --- a/tests/test_buffered_mode.py +++ b/tests/test_buffered_mode.py @@ -149,27 +149,22 @@ def test_force_write_mode_with_permission_error(self): assert job.doc.a == x def test_buffered_mode_change_buffer_size(self): - original_buffer_size = signac.get_buffer_size() - try: - assert not signac.is_buffered() - signac.set_buffer_size(12) - with signac.buffered(): - assert signac.buffered() - assert signac.get_buffer_size() == 12 + assert not signac.is_buffered() + with signac.buffered(buffer_size=12): + assert signac.buffered() + assert signac.get_buffer_size() == 12 - assert not signac.is_buffered() + assert not signac.is_buffered() - assert not signac.is_buffered() + assert not signac.is_buffered() + with signac.buffered(buffer_size=12): + assert signac.buffered() + assert signac.get_buffer_size() == 12 with signac.buffered(): assert signac.buffered() assert signac.get_buffer_size() == 12 - with signac.buffered(): - assert signac.buffered() - assert signac.get_buffer_size() == 12 - assert not signac.is_buffered() - finally: - signac.set_buffer_size(original_buffer_size) + assert not signac.is_buffered() def test_integration(self): def routine(): diff --git a/tests/test_synced_collections/test_json_buffered_collection.py b/tests/test_synced_collections/test_json_buffered_collection.py index 482cc7b41..26d656185 100644 --- a/tests/test_synced_collections/test_json_buffered_collection.py +++ b/tests/test_synced_collections/test_json_buffered_collection.py @@ -19,7 +19,8 @@ MemoryBufferedJSONDict, MemoryBufferedJSONList, ) -from signac.synced_collections.buffers.buffered_collection import buffer_all + +# from signac.synced_collections.buffers.buffered_collection import buffer_all from signac.synced_collections.errors import BufferedError, MetadataError @@ -125,7 +126,7 @@ def test_global_buffered(self, synced_collection, testdata): synced_collection["buffered"] = testdata assert "buffered" in synced_collection assert synced_collection["buffered"] == testdata - with buffer_all(): + with self._collection_type.buffer_backend(): assert "buffered" in synced_collection assert synced_collection["buffered"] == testdata synced_collection["buffered2"] = 1 @@ -134,7 +135,7 @@ def test_global_buffered(self, synced_collection, testdata): assert len(synced_collection) == 2 assert "buffered2" in synced_collection assert synced_collection["buffered2"] == 1 - with buffer_all(): + with self._collection_type.buffer_backend(): del synced_collection["buffered"] assert len(synced_collection) == 1 assert "buffered" not in synced_collection @@ -144,7 +145,7 @@ def test_global_buffered(self, synced_collection, testdata): assert synced_collection["buffered2"] == 1 with pytest.raises(BufferedError): - with buffer_all(): + with self._collection_type.buffer_backend(): synced_collection["buffered2"] = 2 self.store({"test": 1}) assert synced_collection["buffered2"] == 2 @@ -156,7 +157,8 @@ def test_nested_same_collection(self, synced_collection): assert len(synced_collection) == 0 for outer_buffer, inner_buffer in itertools.product( - [synced_collection.buffered, buffer_all], repeat=2 + [synced_collection.buffered, self._collection_type.buffer_backend()], + repeat=2, ): err_msg = ( f"outer_buffer: {type(outer_buffer).__qualname__}, " @@ -194,7 +196,7 @@ def test_nested_different_collections(self, synced_collection, synced_collection assert "inside_first" in synced_collection2 assert "inside_first" in on_disk_dict2 - with buffer_all(): + with self._collection_type.buffer_backend(): synced_collection["inside_second"] = 3 synced_collection2["inside_second"] = 3 @@ -242,7 +244,7 @@ def test_nested_copied_collection(self, synced_collection): assert synced_collection["inside_first"] == 2 assert "inside_first" not in on_disk_dict - with buffer_all(): + with self._collection_type.buffer_backend(): synced_collection["inside_second"] = 3 synced_collection2["inside_second"] = 4 @@ -287,7 +289,7 @@ def test_nested_copied_collection_invalid(self, synced_collection): assert synced_collection["inside_first"] == 2 assert on_disk_dict["inside_first"] == 3 - with buffer_all(): + with self._collection_type.buffer_backend(): synced_collection["inside_second"] = 3 synced_collection2["inside_second"] = 4 @@ -352,7 +354,7 @@ def multithreaded_buffering_test(self, op): with TemporaryDirectory( prefix="jsondict_buffered_multithreaded" ) as tmp_dir: - with buffer_all(): + with self._collection_type.buffer_backend(): num_dicts = 100 dicts = [] dict_data = [] @@ -433,7 +435,7 @@ def test_multithreaded_buffering_clear(self): dicts.append(self._collection_type(filename=fn)) dicts[-1].update({str(j): j for j in range(i)}) - with buffer_all(): + with self._collection_type.buffer_backend(): num_threads = 10 try: with ThreadPoolExecutor(max_workers=num_threads) as executor: @@ -484,7 +486,7 @@ def test_multithreaded_buffering_load(self): # Go to i+1 so that every dict contains the 0 element. dicts[-1].update({str(j): j for j in range(i + 1)}) - with buffer_all(): + with self._collection_type.buffer_backend(): num_threads = 100 try: with ThreadPoolExecutor(max_workers=num_threads) as executor: @@ -560,12 +562,12 @@ def test_buffered(self, synced_collection): def test_global_buffered(self, synced_collection): assert len(synced_collection) == 0 - with buffer_all(): + with self._collection_type.buffer_backend(): synced_collection.reset([1, 2, 3]) assert len(synced_collection) == 3 assert len(synced_collection) == 3 assert synced_collection == [1, 2, 3] - with buffer_all(): + with self._collection_type.buffer_backend(): assert len(synced_collection) == 3 assert synced_collection == [1, 2, 3] synced_collection[0] = 4 @@ -576,7 +578,7 @@ def test_global_buffered(self, synced_collection): # metacheck failure with pytest.raises(BufferedError): - with buffer_all(): + with self._collection_type.buffer_backend(): synced_collection.reset([1]) assert synced_collection == [1] # Unfortunately the resolution of os.stat is @@ -620,7 +622,7 @@ def multithreaded_buffering_test(self, op, requires_init): if requires_init: lists[-1].extend([0 for j in range(i)]) - with buffer_all(): + with self._collection_type.buffer_backend(): num_threads = 10 try: with ThreadPoolExecutor(max_workers=num_threads) as executor: @@ -697,7 +699,7 @@ def test_multithreaded_buffering_load(self): # Go to i+1 so that every list contains the 0 element. lists[-1].extend([j for j in range(i + 1)]) - with buffer_all(): + with self._collection_type.buffer_backend(): num_threads = 100 try: with ThreadPoolExecutor(max_workers=num_threads) as executor: @@ -737,7 +739,7 @@ def test_buffer_flush(self, synced_collection, synced_collection2): synced_collection.clear() synced_collection2.clear() - with buffer_all(): + with self._collection_type.buffer_backend(): synced_collection["foo"] = 1 assert self._collection_type.get_current_buffer_size() == 1 assert synced_collection != self.load(synced_collection)