Skip to content

Commit

Permalink
fix: add lock free mechanism for avoiding race conditions when writin…
Browse files Browse the repository at this point in the history
…g persistence information; consider corrupt metadata records as non-existent (#1745)

* fix: add lock free mechanism for avoiding race conditions when writing persistence information

* consider corrupt metadata records as nonexistent and delete them

* set suffix to contain the original file name and prefix such that the file is hidden.

* do not hide the file
  • Loading branch information
johanneskoester authored Jul 19, 2022
1 parent 531ef4a commit 71fe952
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions snakemake/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import marshal
import pickle
import json
import tempfile
import time
from base64 import urlsafe_b64encode, b64encode
from functools import lru_cache, partial
Expand Down Expand Up @@ -434,9 +435,20 @@ def _output(self, job):

def _record(self, subject, json_value, id):
recpath = self._record_path(subject, id)
os.makedirs(os.path.dirname(recpath), exist_ok=True)
with open(recpath, "w") as f:
json.dump(json_value, f)
recdir = os.path.dirname(recpath)
os.makedirs(recdir, exist_ok=True)
# Write content to temporary file and rename it to the final file.
# This avoids race-conditions while writing (e.g. on NFS when the main job
# and the cluster node job propagate their content and the system has some
# latency including non-atomic propagation processes).
with tempfile.NamedTemporaryFile(
mode="w",
dir=recdir,
delete=False,
suffix=os.path.basename(recpath),
) as tmpfile:
json.dump(json_value, tmpfile)
os.rename(tmpfile.name, recpath)

def _delete_record(self, subject, id):
try:
Expand All @@ -462,7 +474,14 @@ def _read_record_uncached(self, subject, id):
if not self._exists_record(subject, id):
return dict()
with open(self._record_path(subject, id), "r") as f:
return json.load(f)
try:
return json.load(f)
except json.JSONDecodeError as e:
pass
# case: file is corrupted, delete it
logger.warning(f"Deleting corrupted metadata record.")
self._delete_record(subject, id)
return dict()

def _exists_record(self, subject, id):
return os.path.exists(self._record_path(subject, id))
Expand Down

0 comments on commit 71fe952

Please sign in to comment.