Skip to content

Commit

Permalink
Merge branch 'fsspec:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
fleming79 committed Mar 15, 2024
2 parents 03b3106 + 4bd16f6 commit 082b544
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 33 deletions.
1 change: 1 addition & 0 deletions ci/environment-friends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
- flake8
- black
- google-cloud-core
- google-cloud-storage
- google-api-core
- google-api-python-client
- httpretty
Expand Down
22 changes: 22 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
Changelog
=========

2024.3.0
--------

Enhancements

- coroutines throttle to stream pool rather than batches (#1544)
- write transactions in simplecache (#1531)
- allow deep nested refs in referenceFS/parquet (#1530)

Fixes

- remove extra calling mapper contains (#1546)
- connection retry for SMB (#1533)
- zip64 should be on is allowZip64 is (#1532)

Other

- HTTP logging (#1547)
- url_to_fs exposed in package root (#1540)
- sort known_implementations (#1549)
- code quality/stype (#1538, 1537, 1528, 1526)

2024.2.0
--------

Expand Down
43 changes: 29 additions & 14 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,20 +239,35 @@ async def _run_coros_in_chunks(
batch_size = len(coros)

assert batch_size > 0
results = []
for start in range(0, len(coros), batch_size):
chunk = [
asyncio.Task(asyncio.wait_for(c, timeout=timeout))
for c in coros[start : start + batch_size]
]
if callback is not DEFAULT_CALLBACK:
[
t.add_done_callback(lambda *_, **__: callback.relative_update(1))
for t in chunk
]
results.extend(
await asyncio.gather(*chunk, return_exceptions=return_exceptions),
)

async def _run_coro(coro, i):
try:
return await asyncio.wait_for(coro, timeout=timeout), i
except Exception as e:
if not return_exceptions:
raise
return e, i
finally:
callback.relative_update(1)

i = 0
n = len(coros)
results = [None] * n
pending = set()

while pending or i < n:
while len(pending) < batch_size and i < n:
pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
i += 1

if not pending:
break

done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
while done:
result, k = await done.pop()
results[k] = result

return results


Expand Down
5 changes: 5 additions & 0 deletions fsspec/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def rsync(
fs: GenericFileSystem|None
Instance to use if explicitly given. The instance defines how to
to make downstream file system instances from paths.
Returns
-------
dict of the copy operations that were performed, {source: destination}
"""
fs = fs or GenericFileSystem(**(inst_kwargs or {}))
source = fs._strip_protocol(source)
Expand Down Expand Up @@ -137,6 +141,7 @@ def rsync(
logger.debug(f"{len(to_delete)} files to delete")
if delete_missing:
fs.rm(to_delete)
return allfiles


class GenericFileSystem(AsyncFileSystem):
Expand Down
73 changes: 65 additions & 8 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ def complete(self, commit=True):
lpaths = [f.fn for f in self.files]
if commit:
self.fs.put(lpaths, rpaths)
# else remove?
self.files.clear()
self.fs._intrans = False
self.fs._transaction = None
self.fs = None # break cycle


class CachingFileSystem(AbstractFileSystem):
Expand Down Expand Up @@ -391,8 +393,11 @@ def close_and_update(self, f, close):
close()
f.closed = True

def ls(self, path, detail=True):
return self.fs.ls(path, detail)

def __getattribute__(self, item):
if item in [
if item in {
"load_cache",
"_open",
"save_cache",
Expand All @@ -409,6 +414,11 @@ def __getattribute__(self, item):
"read_block",
"tail",
"head",
"info",
"ls",
"exists",
"isfile",
"isdir",
"_check_file",
"_check_cache",
"_mkcache",
Expand All @@ -428,9 +438,12 @@ def __getattribute__(self, item):
"cache_size",
"pipe_file",
"pipe",
"isdir",
"isfile",
"exists",
"start_transaction",
"end_transaction",
]:
}:
# all the methods defined in this class. Note `open` here, since
# it calls `_open`, but is actually in superclass
return lambda *args, **kw: getattr(type(self), item).__get__(self)(
Expand Down Expand Up @@ -756,6 +769,49 @@ def pipe_file(self, path, value=None, **kwargs):
else:
super().pipe_file(path, value)

def ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
details = []
try:
details = self.fs.ls(
path, detail=True, **kwargs
).copy() # don't edit original!
except FileNotFoundError as e:
ex = e
else:
ex = None
if self._intrans:
path1 = path.rstrip("/") + "/"
for f in self.transaction.files:
if f.path == path:
details.append(
{"name": path, "size": f.size or f.tell(), "type": "file"}
)
elif f.path.startswith(path1):
if f.path.count("/") == path1.count("/"):
details.append(
{"name": f.path, "size": f.size or f.tell(), "type": "file"}
)
else:
dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
details.append({"name": dname, "size": 0, "type": "directory"})
if ex is not None and not details:
raise ex
if detail:
return details
return sorted(_["name"] for _ in details)

def info(self, path, **kwargs):
path = self._strip_protocol(path)
if self._intrans:
f = [_ for _ in self.transaction.files if _.path == path]
if f:
return {"name": path, "size": f[0].size or f[0].tell(), "type": "file"}
f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
if f:
return {"name": path, "size": 0, "type": "directory"}
return self.fs.info(path, **kwargs)

def pipe(self, path, value=None, **kwargs):
if isinstance(path, str):
self.pipe_file(self._strip_protocol(path), value, **kwargs)
Expand Down Expand Up @@ -836,6 +892,7 @@ def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
if seek:
self.fh.seek(seek)
self.path = path
self.size = None
self.fs = fs
self.closed = False
self.autocommit = autocommit
Expand All @@ -855,6 +912,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
self.size = self.fh.tell()
if self.closed:
return
self.fh.close()
Expand All @@ -868,15 +926,14 @@ def discard(self):

def commit(self):
self.fs.put(self.fn, self.path, **self.kwargs)
try:
os.remove(self.fn)
except (PermissionError, FileNotFoundError):
# file path may be held by new version of the file on windows
pass
# we do not delete local copy - it's still in the cache

@property
def name(self):
return self.fn

def __repr__(self) -> str:
return f"LocalTempFile: {self.path}"

def __getattr__(self, item):
return getattr(self.fh, item)
15 changes: 9 additions & 6 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ async def _ls_real(self, url, detail=True, **kwargs):
session = await self.set_session()
async with session.get(self.encode_url(url), **self.kwargs) as r:
self._raise_not_found_for_status(r, url)
text = await r.text()
if self.simple_links:
links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
else:
links = [u[2] for u in ex.findall(text)]
try:
text = await r.text()
if self.simple_links:
links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
else:
links = [u[2] for u in ex.findall(text)]
except UnicodeDecodeError:
links = [] # binary, not HTML
out = set()
parts = urlparse(url)
for l in links:
Expand Down Expand Up @@ -430,7 +433,7 @@ async def _info(self, url, **kwargs):
if policy == "get":
# If get failed, then raise a FileNotFoundError
raise FileNotFoundError(url) from exc
logger.debug(str(exc))
logger.debug("", exc_info=exc)

return {"name": url, "size": None, **info, "type": "file"}

Expand Down
2 changes: 2 additions & 0 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def rmdir(self, path):
raise FileNotFoundError(path)

def info(self, path, **kwargs):
logger.debug("info: %s", path)
path = self._strip_protocol(path)
if path in self.pseudo_dirs or any(
p.startswith(path + "/") for p in list(self.store) + self.pseudo_dirs
Expand Down Expand Up @@ -210,6 +211,7 @@ def cp_file(self, path1, path2, **kwargs):
raise FileNotFoundError(path1)

def cat_file(self, path, start=None, end=None, **kwargs):
logger.debug("cat: %s", path)
path = self._strip_protocol(path)
try:
return bytes(self.store[path].getbuffer()[start:end])
Expand Down
6 changes: 6 additions & 0 deletions fsspec/implementations/tests/test_cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,10 +1291,16 @@ def patched_put(*args, **kwargs):
with fs.transaction:
fs.pipe("myfile", b"1")
fs.pipe("otherfile", b"2")
fs.pipe("deep/dir/otherfile", b"3")
with fs.open("blarh", "wb") as f:
f.write(b"ff")
assert not m.find("")

assert fs.info("otherfile")["size"] == 1
assert fs.info("deep")["type"] == "directory"
assert fs.isdir("deep")
assert fs.ls("deep", detail=False) == ["/deep/dir"]

assert m.cat("myfile") == b"1"
assert m.cat("otherfile") == b"2"
assert called[0] == 1 # copy was done in one go
8 changes: 6 additions & 2 deletions fsspec/mapping.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import array
import logging
import posixpath
import warnings
from collections.abc import MutableMapping
from functools import cached_property

from .core import url_to_fs
from fsspec.core import url_to_fs

logger = logging.getLogger("fsspec.mapping")


class FSMap(MutableMapping):
Expand Down Expand Up @@ -69,6 +72,7 @@ def dirfs(self):

def clear(self):
"""Remove all keys below root - empties out mapping"""
logger.info("Clear mapping at %s", self.root)
try:
self.fs.rm(self.root, True)
self.fs.mkdir(self.root)
Expand Down Expand Up @@ -186,7 +190,7 @@ def __delitem__(self, key):
def __contains__(self, key):
"""Does key exist in mapping?"""
path = self._key_to_str(key)
return self.fs.exists(path) and self.fs.isfile(path)
return self.fs.isfile(path)

def __reduce__(self):
return FSMap, (self.root, self.fs, False, False, self.missing_exceptions)
Expand Down
11 changes: 8 additions & 3 deletions fsspec/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Transaction:
instance as the ``.transaction`` attribute of the given filesystem
"""

def __init__(self, fs):
def __init__(self, fs, **kwargs):
"""
Parameters
----------
Expand All @@ -26,8 +26,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""End transaction and commit, if exit is not due to exception"""
# only commit if there was no exception
self.complete(commit=exc_type is None)
self.fs._intrans = False
self.fs._transaction = None
if self.fs:
self.fs._intrans = False
self.fs._transaction = None
self.fs = None

def start(self):
"""Start a transaction on this FileSystem"""
Expand All @@ -43,6 +45,8 @@ def complete(self, commit=True):
else:
f.discard()
self.fs._intrans = False
self.fs._transaction = None
self.fs = None


class FileActor:
Expand Down Expand Up @@ -83,3 +87,4 @@ def complete(self, commit=True):
else:
self.files.discard().result()
self.fs._intrans = False
self.fs = None

0 comments on commit 082b544

Please sign in to comment.