Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached filesystem not concurrency-safe? #1107

Closed
jwodder opened this issue Nov 9, 2022 · 35 comments · Fixed by #1111
Closed

Cached filesystem not concurrency-safe? #1107

jwodder opened this issue Nov 9, 2022 · 35 comments · Fixed by #1111

Comments

@jwodder
Copy link
Contributor

jwodder commented Nov 9, 2022

We have a program that uses fsspec to mount a cached HTTP filesystem as a FUSE mount; while this process runs in the background, another traverses the FUSE mount and inspects multiple files in parallel. Unfortunately, the FUSE process keeps hitting errors in the following form:

Uncaught exception from FUSE operation open, returning errno.EINVAL.
Traceback (most recent call last):
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 734, in _wrapper
    return func(*args, **kwargs) or 0
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 834, in open
    fi.fh = self.operations('open', path.decode(self.encoding),
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 75, in __call__
    return super(DataLadFUSE, self).__call__(op, self.root + path, *args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 1076, in __call__
    return getattr(self, op)(*args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 203, in open
    fsspec_file = self._adapter.open(path)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fsspec.py", line 243, in open
    return dsap.open(relpath, mode=mode, encoding=encoding, errors=errors)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fsspec.py", line 169, in open
    return self.fs.open(url, mode, **kwargs)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 406, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/spec.py", line 1034, in open
    f = self._open(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 406, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 345, in _open
    self.save_cache()
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 406, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 161, in save_cache
    cached_files = pickle.load(f)
EOFError: Ran out of input

I suspect the cause is that the cached filesystem is not safe for concurrent access, with the result that multiple actions on the filesystem are causing the cache file to be read by one procedure while another writes to it. Indeed, there seems to be no locking in the cached filesystem code.

@martindurant
Copy link
Member

You may well be right. The caching metadat should be concurrent safe (i.e., async), but not thread safe. The writing of the file could be protected by a lock, or could use filesystem mv to become atomic (which only works on the local filesystem)

@yarikoptic
Copy link
Contributor

Thank you @martindurant for the quick response. Do you think you would implement one of those mechanisms in some near future to make fsspec thread safe in that regard?

@martindurant
Copy link
Member

It is a reasonable request, but I don't think anyone has such plans right now.

@yarikoptic
Copy link
Contributor

ok, if you were to do it -- would threading lock solution be considered? (I think making mv atomic might be half a solution since it might still require thread locking to avoid parallel modification and thus either conflicts or just loosing some data from one of the racing threads)

@martindurant
Copy link
Member

The cache metadata should be conservative, so that races lead to occasional unnecessary re-fetches of data that were cached but the info not persisted. That's OK, if it is relatively rare.

The problem with the lock, is that it will still be vulnerable to multiple processes using the same cache directory.

@yarikoptic
Copy link
Contributor

ok, @jwodder - could you please implement a mv based solution for storing the cache and see if it would resolve our dandi/dandisets-healthstatus#1 ?

@jwodder
Copy link
Contributor Author

jwodder commented Nov 11, 2022

The save_cache() method already seems to use a mv-based approach:

fd2, fn2 = tempfile.mkstemp()
with open(fd2, "wb") as f:
pickle.dump(cache, f)
self._mkcache()
move(fn2, fn)

The only other method that writes to the cache is clear_expired_cache(), but I don't think that's used here.

@yarikoptic
Copy link
Contributor

yarikoptic commented Nov 11, 2022

I wonder if that move is actually atomic?! the fn is fn = os.path.join(self.storage[-1], "cache") and thus would reside at self.storage, and in our case somewhere under .git/ within that repo. tempfile.mkstemp would generate in TEMPDIR which could be on a different filesystem/storage, and that is the case in our case.
Thus move would entail copying across filesystems: I do not think (although didn't check) it would bother creating some temp other file near destination of copy first to rename later. Thus, and in any case, e.g. for performance reasons, it might be worth making fn2 to be something like fn2 = f'{fn}-{randomstring}' and then placing it all into try/except so in case of error to if os.path.exists(fn2): unlink(fn2). This way move can be atomic within filesystem.

@martindurant
Copy link
Member

Yes, I agree with that pattern - it's exactly what I've had to do elsewhere recently. This is what happens when you develop on a machine with exactly one disk :)

@yarikoptic
Copy link
Contributor

Yes, I agree with that pattern - it's exactly what I've had to do elsewhere recently. This is what happens when you develop on a machine with exactly one disk :)

so seems like it begs for some reusbale helper like

with atomic_write(path, "wb") as f:
    # do whatever for that f handle

which would take care about all the renames etc at the target path. @jwodder could you please code this up to be used in the code block you pointed out, send a PR with it, and see if that would be sufficient to resolve our issue?

@jwodder
Copy link
Contributor Author

jwodder commented Nov 11, 2022

Now the FUSEd cached filesystem is failing with:

Uncaught exception from FUSE operation getattr, returning errno.EINVAL.
Traceback (most recent call last):
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 734, in _wrapper
    return func(*args, **kwargs) or 0
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 774, in getattr
    return self.fgetattr(path, buf, None)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 1027, in fgetattr
    attrs = self.operations('getattr', self._decode_optional_path(path), fh)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 75, in __call__
    return super(DataLadFUSE, self).__call__(op, self.root + path, *args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 1076, in __call__
    return getattr(self, op)(*args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/methodtools.py", line 72, in __call__
    return self.__call__(*args, **kwargs)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 166, in getattr
    fsspec_file.close()
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 381, in <lambda>
    f.close = lambda: self.close_and_update(f, close)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 444, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 394, in close_and_update
    c = self.cached_files[-1][path]
KeyError: 'https://api.dandiarchive.org/api/assets/1355f0fc-2926-406a-b6a5-77cc0681c54f/download'

@martindurant
Copy link
Member

huh, that file exists, but is big. Let's remove that test, I don't remember why it's around. Perhaps we can think of some other very static URL that's likely to be around for a long time.

@jwodder
Copy link
Contributor Author

jwodder commented Nov 11, 2022

@martindurant Did you mean to comment on a different issue? My comment has nothing to do with fsspec's tests.

@martindurant
Copy link
Member

Oh, well if it's not one of our tests...

@jwodder
Copy link
Contributor Author

jwodder commented Nov 11, 2022

@martindurant I was referring to the in-production behavior of the program I described in my original comment at the start of this issue.

@yarikoptic
Copy link
Contributor

Now the FUSEd cached filesystem is failing with:

  • that is great news meaning that we have identified and provided a viable fix for fsspec! 🎆
  • new exception: please troubleshoot/fix within datalad-fuse

@jwodder
Copy link
Contributor Author

jwodder commented Nov 11, 2022

@yarikoptic I'm not sure that the current error is a datalad-fuse error. I suspect it may be a side effect of multiple FUSE operations writing to the fsspec cache file at once, resulting in some entries being randomly lost as cache file A is replaced by cache file B, where file B was prepared from data generated before file A was written.

@yarikoptic
Copy link
Contributor

ah, ok so those situations @martindurant mentioned above. May be we would be doomed to not rely on their (our internal and fsspec cache) 1-to-1 correspondence... will check more in detail later unless someone beats me to it (about to take off in a plane without internet)

@yarikoptic
Copy link
Contributor

@jwodder could you please try with that fsspec_file.close() to be within try/except KeyError -- would we hit any other gotcha or it just would work?

overall I feel that there would not be solution without introducing thread-safety in manipulation of the cache here. atomic writes (#1111) are good to have but not a full solution indeed and would require analysis of the code paths regarding modifications of cached_files etc (e.g. save_cache)

@jwodder
Copy link
Contributor Author

jwodder commented Nov 14, 2022

@yarikoptic Do you mean just suppress the error in datalad-fuse's code?

@yarikoptic
Copy link
Contributor

Yes

@jwodder
Copy link
Contributor Author

jwodder commented Nov 14, 2022

The program is running without errors now, though datalad-fuse is emitting a number of warnings like:

Exception ignored in: <function AbstractBufferedFile.__del__ at 0x7fadfb25ab80>
Traceback (most recent call last):
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/spec.py", line 1763, in __del__
    self.close()
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 381, in <lambda>
    f.close = lambda: self.close_and_update(f, close)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 444, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 394, in close_and_update
    c = self.cached_files[-1][path]
KeyError: 'https://api.dandiarchive.org/api/assets/1355f0fc-2926-406a-b6a5-77cc0681c54f/download'

which may or may not suggest that #1078 wasn't fully resolved, even though the problem no longer arises on the MVCE I posted there.

@martindurant
Copy link
Member

close_and_update should probably do something sensible, even if it means losing information on previously cached files, rather than erroring in this situation.

@jwodder
Copy link
Contributor Author

jwodder commented Nov 14, 2022

The program eventually crashed with the following error from the FUSE process:

Uncaught exception from FUSE operation getattr, returning errno.EINVAL.
Traceback (most recent call last):
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 734, in _wrapper
    return func(*args, **kwargs) or 0
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 774, in getattr
    return self.fgetattr(path, buf, None)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 1027, in fgetattr
    attrs = self.operations('getattr', self._decode_optional_path(path), fh)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 75, in __call__
    return super(DataLadFUSE, self).__call__(op, self.root + path, *args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 1076, in __call__
    return getattr(self, op)(*args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/methodtools.py", line 72, in __call__
    return self.__call__(*args, **kwargs)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 167, in getattr
    fsspec_file.close()
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 381, in <lambda>
    f.close = lambda: self.close_and_update(f, close)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 444, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 399, in close_and_update
    self.save_cache()
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 444, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 179, in save_cache
    for k, c in cache.items():
RuntimeError: dictionary changed size during iteration

Uncaught exception from FUSE operation release, returning errno.EINVAL.
Traceback (most recent call last):
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 734, in _wrapper
    return func(*args, **kwargs) or 0
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 892, in release
    return self.operations('release', self._decode_optional_path(path), fh)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 75, in __call__
    return super(DataLadFUSE, self).__call__(op, self.root + path, *args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fuse.py", line 1076, in __call__
    return getattr(self, op)(*args)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/datalad_fuse/fuse_.py", line 266, in release
    f.close()
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 381, in <lambda>
    f.close = lambda: self.close_and_update(f, close)
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 444, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 399, in close_and_update
    self.save_cache()
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 444, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  File "/home/dandi/cronlib/dandisets-healthstatus/venv/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 173, in save_cache
    blocks.update(c["blocks"])
AttributeError: 'list' object has no attribute 'update'

The second error was repeated several times. I'm unsure exactly what happened internally.

@martindurant
Copy link
Member

for k, c in cache.copy().items():

would eliminate the first issue

The second implies that blocks is assumed to be a dict or set, but is actually a list.

@jwodder
Copy link
Contributor Author

jwodder commented Nov 14, 2022

@martindurant Adding copy() is a band-aid; it would be better to understand why this is happening, as the loop in question does not modify cache. Moreover, the comment on lines 168-171 implies that breaking identity by copying the values of cache would break MMapCache.

As for the second error, it's clear that blocks is assumed to be a set, and it's pickled as a list for some inscrutable reason, but I can't figure out where the conversion back to a set is failing.

@yarikoptic
Copy link
Contributor

As for the second error, it's clear that blocks is assumed to be a set, and it's pickled as a list for some inscrutable reason, but I can't figure out where the conversion back to a set is failing.

casting into list is right there a few lines down: see e.g. https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/cached.py#L186

        cache = {k: v.copy() for k, v in cached_files.items()}
        for c in cache.values():
            if isinstance(c["blocks"], set):
                c["blocks"] = list(c["blocks"])

which was added in a6d96f7 . Unfortnately it is not clear to me the reason for the casting -- may be @martindurant you remember and see how we should get out of this "pickle"? ;)

@martindurant
Copy link
Member

It was a long time ago... Since it's in the save function, maybe it was to allow JSON serialisation.

@yarikoptic
Copy link
Contributor

as no more json there but rather pickle -- I guess it should be safe to remove that right?

@martindurant Adding copy() is a band-aid; it would be better to understand why this is happening, as the loop in question does not modify cache. Moreover, the comment on lines 168-171 implies that breaking identity by copying the values of cache would break MMapCache.

I also can't grasp what it is about, but I guess since we run into it only in our case, has smth to do with multithreading -- may be some other code path manages to modify it somehow? (although I don't see how). As for identity -- I think since .copy isn't deep -- wouldn't it copy the dict containing items like "blocks" and thus be safe to do here? (unless someone figures it out -- it might just need to be done meanwhile)

@jwodder
Copy link
Contributor Author

jwodder commented Nov 15, 2022

@yarikoptic The casting to list is performed on a copy of the cache that is then pickled, and the lists are converted back to sets when un-pickled; as far as I can tell, the cache itself should always contain sets.

@martindurant
Copy link
Member

I'm sorry, my family is ill, not sure when I can get back to you on this. Please try pinging me in a couple of days if you've made no progress.

@yarikoptic
Copy link
Contributor

no problem @martindurant -- get better!

@jwodder

@yarikoptic The casting to list is performed on a copy of the cache that is then pickled, and the lists are converted back to sets when un-pickled; as far as I can tell, the cache itself should always contain sets.

I do not see any casting to set( when unpickling... at the lines above at https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/cached.py#L160 . Which conversion you had in mind?

@jwodder
Copy link
Contributor Author

jwodder commented Nov 16, 2022

@yarikoptic If the in-memory cache (cache) contains sets, then these lines will update those sets with the contents of the lists read from the pickle file:

blocks = cache[k]["blocks"]
blocks.update(c["blocks"])
c["blocks"] = blocks

Additionally, there's this in load_cache():

for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])

@yarikoptic
Copy link
Contributor

ok, anyways -- it seems that there is no longer need for list casting for saving - now into pickle but before was in json. So a likely fix here is just to remove this casting into list before pickling, keep list->set casting upon load for compatibility with existing caches and that error should go away IMHO.

martindurant added a commit to jwodder/filesystem_spec that referenced this issue Nov 22, 2022
@yarikoptic
Copy link
Contributor

eh, I should have spotted it -- AFAIK #1111 is not completely addressing this issue (concurency-safety) so it should have not been closed with that PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants