Skip to content

Commit

Permalink
Feature/synced collection/simplify global buffering (#482)
Browse files Browse the repository at this point in the history
Eliminate the global buffering mode across all backends in favor of a more targeted per-backend approach.

* Enable per backend buffering.

* Remove global buffering in favor of class-specific buffering.

* Reintroduce warnings for deprecated functionality.

* Remove truly global buffering and replace it with class-level buffering.

* Document new features.
  • Loading branch information
vyasr authored Jan 26, 2021
1 parent dd3c565 commit 5ccda3f
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 108 deletions.
7 changes: 4 additions & 3 deletions signac/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
from .core.jsondict import flush_all as flush
from .db import get_database
from .diff import diff_jobs
from .synced_collections.backends.collection_json import JSONDict
from .synced_collections.buffers.buffered_collection import buffer_all as buffered
from .synced_collections.buffers.buffered_collection import is_buffered
from .synced_collections.backends.collection_json import BufferedJSONDict, JSONDict
from .version import __version__

buffered = BufferedJSONDict.buffer_backend
is_buffered = BufferedJSONDict.backend_is_buffered

__all__ = [
"__version__",
"contrib",
Expand Down
80 changes: 10 additions & 70 deletions signac/synced_collections/buffers/buffered_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@
"""

import logging
import warnings
from inspect import isabstract
from typing import Any, List

from .. import SyncedCollection
from ..errors import BufferedError
from ..utils import _CounterFuncContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,8 +79,6 @@ class BufferedCollection(SyncedCollection):
"""

_BUFFERED_BACKENDS: List[Any] = []

def __init__(self, *args, **kwargs):
# The `_buffered` attribute _must_ be defined prior to calling the
# superclass constructors in order to enable subclasses to override
Expand All @@ -102,30 +97,17 @@ def __init_subclass__(cls):
"""
super().__init_subclass__()
if not isabstract(cls):
BufferedCollection._BUFFERED_BACKENDS.append(cls)

@staticmethod
def _flush_all_backends():
"""Execute all deferred write operations.
cls._buffer_context = _CounterFuncContext(cls._flush_buffer)

Raises
------
BufferedError
If there are any issues with flushing any backend.
@classmethod
def buffer_backend(cls, *args, **kwargs):
"""Enter context to buffer all operations for this backend."""
return cls._buffer_context

"""
logger.debug("Flushing buffer...")
issues = {}
for backend in BufferedCollection._BUFFERED_BACKENDS:
try:
# try to sync the data to backend
issue = backend._flush_buffer()
issues.update(issue)
except OSError as error:
logger.error(str(error))
issues[backend] = error
if issues:
raise BufferedError(issues)
@classmethod
def backend_is_buffered(cls):
"""Check if this backend is currently buffered."""
return bool(cls._buffer_context)

def _save(self):
"""Synchronize data with the backend but buffer if needed.
Expand Down Expand Up @@ -188,7 +170,7 @@ def _load_from_buffer(self):
@property
def _is_buffered(self):
"""Check if we should write to the buffer or not."""
return self.buffered or _BUFFER_ALL_CONTEXT
return self.buffered or type(self)._buffer_context

def _flush(self):
"""Flush data associated with this instance from the buffer."""
Expand All @@ -198,45 +180,3 @@ def _flush(self):
def _flush_buffer(self):
"""Flush all data in this class's buffer."""
pass


# This module-scope variable is a context that can be accessed via the
# buffer_all method for the purpose of buffering all subsequence read and write
# operations.
_BUFFER_ALL_CONTEXT = _CounterFuncContext(BufferedCollection._flush_all_backends)


# This function provides a more familiar module-scope, function-based interface
# for enabling buffering rather than calling the class's static method.
def buffer_all(force_write=None, buffer_size=None):
"""Return a global buffer context for all BufferedCollection instances.
All future operations use the buffer whenever possible. Write operations
are deferred until the context is exited, at which point all buffered
backends will flush their buffers. Individual backends may flush their
buffers within this context if the implementation requires it; this context
manager represents a promise to buffer whenever possible, but does not
guarantee that no writes will occur under all circumstances.
"""
if force_write is not None:
warnings.warn(
DeprecationWarning(
"The force_write parameter is deprecated and will be removed in "
"signac 2.0. This functionality is no longer supported."
)
)
if buffer_size is not None:
warnings.warn(
DeprecationWarning(
"The buffer_size parameter is deprecated and will be removed in "
"signac 2.0. The buffer size should be set using the "
"set_buffer_capacity method of FileBufferedCollection or any of its "
"subclasses."
)
)
return _BUFFER_ALL_CONTEXT


def is_buffered():
"""Check the global buffered mode setting."""
return bool(_BUFFER_ALL_CONTEXT)
67 changes: 64 additions & 3 deletions signac/synced_collections/buffers/file_buffered_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,50 @@

import errno
import os
import warnings
from abc import abstractmethod
from threading import RLock
from typing import Dict, Tuple, Union

from ..data_types.synced_collection import _LoadAndSave
from ..errors import MetadataError
from ..utils import _NullContext
from ..errors import BufferedError, MetadataError
from ..utils import _CounterFuncContext, _NullContext
from .buffered_collection import BufferedCollection


class _FileBufferedContext(_CounterFuncContext):
"""Extend the usual buffering context to support setting the buffer size.
This context allows the buffer_backend method to temporarily set the buffer
size within the scope of this context.
"""

def __init__(self, cls):
super().__init__(cls._flush_buffer)
self._buffer_capacity = None
self._original_buffer_capacitys = []
self._cls = cls

def __call__(self, buffer_capacity=None):
self._buffer_capacity = buffer_capacity
return self

def __enter__(self):
super().__enter__()
if self._buffer_capacity is not None:
self._original_buffer_capacitys.append(self._cls.get_buffer_capacity())
self._cls.set_buffer_capacity(self._buffer_capacity)
else:
self._original_buffer_capacitys.append(None)
self._buffer_capacity = None

def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
original_buffer_capacity = self._original_buffer_capacitys.pop()
if original_buffer_capacity is not None:
self._cls.set_buffer_capacity(original_buffer_capacity)


class _BufferedLoadAndSave(_LoadAndSave):
"""Wrap base loading and saving with an extra thread lock.
Expand Down Expand Up @@ -103,6 +137,8 @@ def __init_subclass__(cls):
# buffering state.
cls._buffered_collections: Dict[int, BufferedCollection] = {}

cls._buffer_context = _FileBufferedContext(cls)

@classmethod
def enable_multithreading(cls):
"""Allow multithreaded access to and modification of :class:`SyncedCollection`s.
Expand Down Expand Up @@ -306,4 +342,29 @@ def _flush_buffer(cls, force=False, retain_in_force=False):
issues[collection._filename] = err
if not issues:
cls._buffered_collections = remaining_collections
return issues
else:
raise BufferedError(issues)

# TODO: The buffer_size argument should be changed to buffer_capacity in
# signac 2.0 for consistency with the new names in synced collections.
@classmethod
def buffer_backend(cls, buffer_size=None, force_write=None, *args, **kwargs):
"""Enter context to buffer all operations for this backend.
Parameters
----------
buffer_size : int
The capacity of the buffer to use within this context (resets after
the context is exited).
force_write : bool
This argument does nothing and is only present for compatibility
with signac 1.x.
"""
if force_write is not None:
warnings.warn(
DeprecationWarning(
"The force_write parameter is deprecated and will be removed in "
"signac 2.0. This functionality is no longer supported."
)
)
return cls._buffer_context(buffer_size)
25 changes: 10 additions & 15 deletions tests/test_buffered_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,27 +149,22 @@ def test_force_write_mode_with_permission_error(self):
assert job.doc.a == x

def test_buffered_mode_change_buffer_size(self):
original_buffer_size = signac.get_buffer_size()
try:
assert not signac.is_buffered()
signac.set_buffer_size(12)
with signac.buffered():
assert signac.buffered()
assert signac.get_buffer_size() == 12
assert not signac.is_buffered()
with signac.buffered(buffer_size=12):
assert signac.buffered()
assert signac.get_buffer_size() == 12

assert not signac.is_buffered()
assert not signac.is_buffered()

assert not signac.is_buffered()
assert not signac.is_buffered()
with signac.buffered(buffer_size=12):
assert signac.buffered()
assert signac.get_buffer_size() == 12
with signac.buffered():
assert signac.buffered()
assert signac.get_buffer_size() == 12
with signac.buffered():
assert signac.buffered()
assert signac.get_buffer_size() == 12

assert not signac.is_buffered()
finally:
signac.set_buffer_size(original_buffer_size)
assert not signac.is_buffered()

def test_integration(self):
def routine():
Expand Down
Loading

0 comments on commit 5ccda3f

Please sign in to comment.