diff --git a/src/kompress/__init__.py b/src/kompress/__init__.py index 8cfa297..aedc09b 100644 --- a/src/kompress/__init__.py +++ b/src/kompress/__init__.py @@ -20,6 +20,8 @@ Union, ) +from ._zippath import _walk_paths + PathIsh = Union[Path, str] @@ -227,6 +229,9 @@ def exists(self) -> bool: # special case, the base class returns False in this case for some reason return self.filepath.exists() return super().exists() or self._as_dir().exists() + # TODO hmm seems that base class has special treatment for .at argument during construction, + # it actually checks if it's a file or a dir, and in case of dir, appends '/'? + # maybe use resolve_dir thing from base class?? def _as_dir(self) -> zipfile.Path: # note: seems that zip always uses forward slash, regardless OS? @@ -316,3 +321,30 @@ def suffixes(self) -> List[str]: @property def suffix(self) -> str: return Path(self.parts[-1]).suffix + + def walk( + self, + *, + top_down: bool = True, + on_error=None, + follow_symlinks: bool = False, + ) -> Iterator[tuple[ZipPath, list[str], list[str]]]: + assert top_down, "specifying top_down isn't supported for zipfile.Path yet" + assert on_error is None, "on_error isn't supported for zipfile.Path yet" + + at = self.at + names = [] + for n in self.root.namelist(): + if not n.startswith(at): + continue + rest = n[len(at) :] + if rest != '': + # no need to append the subdir itself? + names.append(rest) + names.sort() + + # note: seems that zip always uses forward slash, regardless OS? + for r, dirs, files in _walk_paths(names, separator='/'): + # make sure we don't construct ZipPath with at='.'... this behaves weird + rr = self if r == '.' else self / r + yield rr, dirs, files diff --git a/src/kompress/_zippath.py b/src/kompress/_zippath.py new file mode 100644 index 0000000..5c48ddc --- /dev/null +++ b/src/kompress/_zippath.py @@ -0,0 +1,139 @@ +""" +Helper utils for zippath adapter +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Iterable, Iterator, List, Tuple + +RootName = str +DirName = str +FileName = str +Entry = Tuple[RootName, List[Tuple[DirName, 'Entry']], List[FileName]] + + +def _walk_paths( + paths: Iterable[str], + separator: str, +) -> Iterator[tuple[RootName, list[DirName], list[FileName]]]: + # this is basically a tree, so we can walk it later, potentially skipping dirs modified in-place by the callee + stack: list[Entry] = [('.', [], [])] + stack_pos = 0 + + for p in paths: + split = p.rsplit(separator, maxsplit=1) + if len(split) == 1: + p_parent = '.' + [p_name] = split + else: + p_parent, p_name = split + + is_dir = p_name == '' + + if is_dir: + split = p_parent.rsplit(separator, maxsplit=1) + if len(split) == 1: + target_root = '.' + [dirname] = split + else: + # todo hmm can we avoid extra split? + target_root, dirname = p_parent.rsplit(separator, maxsplit=1) + else: + target_root = p_parent + + while True: + assert stack_pos >= 0, (p, stack) + parent_root, parent_dirs, parent_files = stack[stack_pos] + if target_root == parent_root: + break + stack_pos -= 1 + + if is_dir: # new dir detected! + new_entry: Entry = (p_parent, [], []) + stack.append(new_entry) + stack_pos = len(stack) - 1 + + parent_dirs.append((dirname, new_entry)) # type: ignore[possibly-undefined] + else: + assert stack_pos != -1, (p, stack) + parent_files.append(p_name) + + def _traverse(entry: Entry) -> Iterator[tuple[RootName, list[DirName], list[FileName]]]: + (root, dir_entries, files) = entry + child_dirs = dict(dir_entries) + dirnames = list(child_dirs.keys()) + + dirnames.sort() + files.sort() + yield root, dirnames, files + + # traverse dirnames, not dir_entries! since we want to respect if the callee modifies them + for d in dirnames: + yield from _traverse(child_dirs[d]) + + yield from _traverse(stack[0]) + + +def test_walk_paths_basic() -> None: + # not sure about this one but this is kinda compatible with pathlib.Path.glob behaviour + assert list(_walk_paths([], separator=os.sep)) == [ + ('.', [], []), + ] + + # just two files with no extra dirs + assert list(_walk_paths(['aaa', 'bbb'], separator=os.sep)) == [ + ('.', [], ['aaa', 'bbb']), + ] + + # one empty dir + assert list(_walk_paths(['aaa/'], separator='/')) == [ + # fmt: off + ('.' , ['aaa'], []), + ('aaa', [] , []), + # fmt: on + ] + + # dir with one dir with one file + assert list(_walk_paths(['aaa/', 'aaa/bbb/', 'aaa/bbb/fff'], separator='/')) == [ + # fmt: off + ('.' , ['aaa'], []), + ('aaa' , ['bbb'], []), + ('aaa/bbb', [] , ['fff']), + # fmt: on + ] + + +def test_walk_paths_against_stdlib() -> None: + import sys + + import pytest + + if sys.version_info[:2] < (3, 12): + pytest.skip("pathlibe.Path.walk is only present from python 3.12") + + def as_paths(root: Path) -> Iterator[str]: + for r, dirs, files in root.walk(): + rr = r.relative_to(root) + for d in dirs: + yield f'{rr/ d}{os.sep}' + for f in files: + yield str(rr / f) + + def check_against_builtin(root: Path) -> None: + expected = [] + for r, dirs, files in root.walk(): + dirs.sort() + files.sort() + expected.append((str(r.relative_to(root)), dirs, files)) + assert len(expected) > 1 # just in case + + paths = sorted(as_paths(root)) + actual = list(_walk_paths(paths, separator=os.sep)) + + assert expected == actual + + git_path = Path(__file__).absolute().parent.parent.parent / '.git' + assert git_path.exists(), git_path + check_against_builtin(git_path) diff --git a/src/kompress/tests/kompress.py b/src/kompress/tests/kompress.py index 48ca910..2ba7636 100644 --- a/src/kompress/tests/kompress.py +++ b/src/kompress/tests/kompress.py @@ -118,12 +118,22 @@ def test_zippath(tmp_path: Path) -> None: assert ZipPath(target) == ZipPath(target) assert zp.absolute() == zp + assert zp / '.' == zp # shouldn't crash hash(zp) assert zp.exists() + assert (zp / 'gdpr_export').exists() assert (zp / 'gdpr_export' / 'comments').exists() + ## NOTE: in pathlib.Path these work, however not in zipfile.Path + ## for now we don't support them either, need to be really careful if we wanna diverge from zipfile.Path + ## but in + # assert (zp / '.').exists() + # assert (zp / '.' / 'gdpr_export').exists() + # assert (zp / 'gdpr_export' / './comments').exists() + ## + # check str constructor just in case assert (ZipPath(str(target)) / 'gdpr_export' / 'comments').exists() assert not (ZipPath(str(target)) / 'whatever').exists() diff --git a/src/kompress/tests/zippath.py b/src/kompress/tests/zippath.py new file mode 100644 index 0000000..4bc6ad6 --- /dev/null +++ b/src/kompress/tests/zippath.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from pathlib import Path +from zipfile import ZipFile + +from .. import ZipPath + +structure_data: Path = Path(__file__).parent / "structure_data" + + +def test_walk_empty(tmp_path: Path) -> None: + path = tmp_path / 'empty.zip' + with ZipFile(path, 'w'): + pass + assert path.exists() + zp = ZipPath(path) + + # this is consistent with pathlib.Path.walk over empty dir + assert list(zp.walk()) == [ + (zp / '.', [], []), + ] + + +def test_walk_1(tmp_path: Path) -> None: + path = tmp_path / 'empty.zip' + with ZipFile(path, 'w') as z: + z.writestr('file2', 'data2') + z.writestr('file1', 'data2') + assert path.exists() + zp = ZipPath(path) + + assert list(zp.walk()) == [ + (zp, [], ['file1', 'file2']), + ] + + +def test_walk_2(tmp_path: Path) -> None: + path = tmp_path / 'empty.zip' + with ZipFile(path, 'w') as z: + z.writestr('empty_dir/', '') + z.writestr('file', 'alala') + z.writestr('aaa/bbb', 'some_data') + z.writestr('aaa/ccc/ddd', 'some_data_2') + assert path.exists() + zp = ZipPath(path) + + assert list(zp.walk()) == [ + # fmt: off + (zp , ['aaa', 'empty_dir'], ['file']), + (zp / 'aaa' , ['ccc'] , ['bbb']), + (zp / 'aaa/ccc' , [] , ['ddd']), + (zp / 'empty_dir', [] , []), + # fmt: on + ] + + # testcase when we aren't starting from root + assert list((zp / 'aaa').walk()) == [ + # fmt: off + (zp / 'aaa' , ['ccc'] , ['bbb']), + (zp / 'aaa/ccc' , [] , ['ddd']), + # fmt: on + ] + + # check that .walk respects modifying dirs in-place, like regular pathlib + all_files = [] + for _r, dirs, files in zp.walk(): + if 'ccc' in dirs: + dirs.remove('ccc') + all_files.extend(files) + assert all_files == ['file', 'bbb'] + + +def test_walk_gdpr_export() -> None: + target = structure_data / 'gdpr_export.zip' + assert target.exists(), target # precondition + + zp = ZipPath(target) + + def _check_walk(z): + for r, dirs, files in z.walk(): + assert r.exists() + yield r + for d in dirs: + assert (r / d).exists() + for f in files: + assert (r / f).exists() + yield (r / f) + + results = list(_check_walk(zp)) + assert len(results) == 8