Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exceeding the buffer size in parallel context causes errors during flush #397

Closed
bdice opened this issue Oct 8, 2020 · 3 comments · Fixed by #484
Closed

Exceeding the buffer size in parallel context causes errors during flush #397

bdice opened this issue Oct 8, 2020 · 3 comments · Fixed by #484
Labels
bug Something isn't working
Milestone

Comments

@bdice
Copy link
Member

bdice commented Oct 8, 2020

Description

I have a project with lots of jobs and large job documents. I tried to check the status of the project using signac-flow. One of the conditions involves checking for a key in the job document. The status check attempts to operate in parallel over all jobs. The status check also operates in signac's "buffered" mode, to minimize I/O. This combination of parallel + buffering appears to be unsafe if the buffer size is exceeded. I would guess the buffer is also susceptible to race conditions in parallel reading/writing, but the specific issue being reported here is read-only.

The buffer metadata appears to be kept in global state:

DEFAULT_BUFFER_SIZE = 32 * 2**20 # 32 MB
_BUFFERED_MODE = 0
_BUFFERED_MODE_FORCE_WRITE = None
_BUFFER_SIZE = None
_JSONDICT_BUFFER = dict()
_JSONDICT_HASHES = dict()
_JSONDICT_META = dict()

If signac-flow's (parallel) status checks overflow the 32 MB buffer size, a flush is forced, but the metadata doesn't seem to be in sync across threads/processes. Thus, the flush (perhaps the second flush? unsure) can't find the keys corresponding to filenames that no longer exist in the metadata / buffer. I have a reproduction script below.

I have verified that the following changes will prevent the problem:

  1. Increasing the buffer size (so data fits in buffer)
  2. Decreasing the data size (so data fits in buffer)

As for solutions, it's not obvious how this should be fixed. This seems like a design problem in our buffering scheme, and whatever we do in #363 should probably handle this. Some possibilities:

  1. Make sure every process has its own buffer. This will fix the issue with flushing but would cause other issues with inconsistency between concurrent processes.
  2. Introduce a locking mechanism for writing to the global buffer? Not sure if this is a solution, would need to think more.
  3. Force flushes to synchronize across processes?

To reproduce

# init.py
import numpy as np
import signac
from tqdm import tqdm


# Create project files
project = signac.init_project("overflow")

print('Creating jobs...')
for i in tqdm(range(1000)):
    job = project.open_job({"i": i}).init()
    # Need enough data in the job documents to overflow the default buffer size of 32 MB
    job.doc['some_data'] = np.random.rand(100, 100).tolist()
    job.doc['parity'] = "odd" if i % 2 else "even"
# project.py

import numpy as np
import signac
from tqdm import tqdm
from flow import FlowProject


class Project(FlowProject):
    pass


@Project.operation
@Project.pre(lambda job: job.doc['parity'] == "even")
@Project.post.isfile("out.txt")
def write_id_if_even(job):
    with open(job.fn("out.txt"), "w") as f:
        f.write(str(job))


if __name__ == "__main__":
    Project().main()

To cause the error, run:

python init.py
python project.py status

Error output

Using environment configuration: StandardEnvironment

Collecting job status info:  16%|█▋        | 165/1000 [00:00<00:03, 230.38it/s]
ERROR:flow.project:Error during status update: 
Use '--ignore-errors' to complete the update anyways.
Traceback (most recent call last):
  File "/home/bdice/code/signac-flow/flow/project.py", line 506, in __call__
    return self._callback(*jobs)
  File "project.py", line 12, in <lambda>
    @Project.pre(lambda job: job.doc['parity'] == "even")
  File "/home/bdice/code/signac/signac/core/synceddict.py", line 204, in __getitem__
    self._synced_load()
  File "/home/bdice/code/signac/signac/core/synceddict.py", line 171, in _synced_load
    self.load()
  File "/home/bdice/code/signac/signac/core/synceddict.py", line 176, in load
    data = self._load()
  File "/home/bdice/code/signac/signac/core/jsondict.py", line 270, in _load
    _store_in_buffer(self._filename, blob, store_hash=True)
  File "/home/bdice/code/signac/signac/core/jsondict.py", line 81, in _store_in_buffer
    flush_all()
  File "/home/bdice/code/signac/signac/core/jsondict.py", line 100, in flush_all
    meta = _JSONDICT_META.pop(filename)
KeyError: '/home/bdice/tmp_signac_buffer_overflow_parallel/workspace/2e2e556ecbb247a58c64e78a72a7a384/signac_job_document.json'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/bdice/code/signac/signac/core/jsondict.py", line 184, in buffer_reads_writes
    yield
  File "/home/bdice/code/signac-flow/flow/project.py", line 2705, in _potentially_buffered
    yield
  File "/home/bdice/code/signac-flow/flow/project.py", line 1820, in _fetch_status
    return list(tqdm(
  File "/home/bdice/miniconda3/envs/dice/lib/python3.8/site-packages/tqdm/std.py", line 1129, in __iter__
    for obj in iterable:
  File "/home/bdice/miniconda3/envs/dice/lib/python3.8/multiprocessing/pool.py", line 868, in next
    raise value
  File "/home/bdice/miniconda3/envs/dice/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/home/bdice/code/signac-flow/flow/project.py", line 1732, in get_job_status
    result['operations'] = OrderedDict(self._get_operations_status(job, cached_status))
  File "/home/bdice/code/signac-flow/flow/project.py", line 1708, in _get_operations_status
    eligible = False if completed else group._eligible((job,))
  File "/home/bdice/code/signac-flow/flow/project.py", line 903, in _eligible
    return any(op._eligible(jobs, ignore_conditions) for op in self)
  File "/home/bdice/code/signac-flow/flow/project.py", line 903, in <genexpr>
    return any(op._eligible(jobs, ignore_conditions) for op in self)
  File "/home/bdice/code/signac-flow/flow/project.py", line 585, in _eligible
    or all(cond(jobs) for cond in self._prereqs)
  File "/home/bdice/code/signac-flow/flow/project.py", line 585, in <genexpr>
    or all(cond(jobs) for cond in self._prereqs)
  File "/home/bdice/code/signac-flow/flow/project.py", line 509, in __call__
    raise UserConditionError(
flow.errors.UserConditionError: An exception was raised while evaluating the condition <lambda> for job c2a263c1c8712ae23c2d17b8b8498ac6.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/bdice/code/signac-flow/flow/project.py", line 3644, in _main_status
    self.print_status(jobs=jobs, **args)
  File "/home/bdice/code/signac-flow/flow/project.py", line 2103, in print_status
    tmp = self._fetch_status(jobs, err, ignore_errors, status_parallelization)
  File "/home/bdice/code/signac-flow/flow/project.py", line 1877, in _fetch_status
    return statuses
  File "/home/bdice/miniconda3/envs/dice/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/bdice/code/signac-flow/flow/project.py", line 2705, in _potentially_buffered
    yield
  File "/home/bdice/miniconda3/envs/dice/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/bdice/code/signac/signac/core/jsondict.py", line 192, in buffer_reads_writes
    assert not _JSONDICT_HASHES
AssertionError

System configuration

  • Operating System: Ubuntu 20.04 / Windows Subsystem for Linux 2.
  • Version of Python: 3.8
  • Version of signac: 1.5
@bdice bdice added the bug Something isn't working label Oct 8, 2020
@vyasr
Copy link
Contributor

vyasr commented Oct 8, 2020

Giving every process its own buffer would be safe as long as we could split work in a way that guaranteed that no changes would conflict. Otherwise I think a truly parallel-safe mutex-based buffer is really the only robust solution. The inconsistent and unreliable nature of file locking on parallel filesystems means that relying on file locks isn't really an option.

@vyasr
Copy link
Contributor

vyasr commented Oct 8, 2020

In any case this seems very relevant to #363

@vyasr
Copy link
Contributor

vyasr commented Jan 27, 2021

@bdice I'd like to close this with the merge of SyncedCollections, which makes the buffer thread-safe. I don't think that process parallel buffering is in scope for signac, and glotzerlab/signac-flow#417 addresses this problem in the context of signac-flow.

@vyasr vyasr mentioned this issue Jan 27, 2021
12 tasks
@bdice bdice added this to the 1.7.0 milestone Jun 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants