Skip to content

Commit

Permalink
zip handling: zupport .glob method like pathlib.Path from 3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
karlicoss committed Sep 16, 2024
1 parent b412754 commit 5103075
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 0 deletions.
32 changes: 32 additions & 0 deletions src/kompress/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
Union,
)

from ._zippath import _walk_paths

PathIsh = Union[Path, str]


Expand Down Expand Up @@ -227,6 +229,9 @@ def exists(self) -> bool:
# special case, the base class returns False in this case for some reason
return self.filepath.exists()
return super().exists() or self._as_dir().exists()
# TODO hmm seems that base class has special treatment for .at argument during construction,
# it actually checks if it's a file or a dir, and in case of dir, appends '/'?
# maybe use resolve_dir thing from base class??

def _as_dir(self) -> zipfile.Path:
# note: seems that zip always uses forward slash, regardless OS?
Expand Down Expand Up @@ -316,3 +321,30 @@ def suffixes(self) -> List[str]:
@property
def suffix(self) -> str:
return Path(self.parts[-1]).suffix

def walk(
self,
*,
top_down: bool = True,
on_error=None,
follow_symlinks: bool = False,
) -> Iterator[tuple[ZipPath, list[str], list[str]]]:
assert top_down, "specifying top_down isn't supported for zipfile.Path yet"
assert on_error is None, "on_error isn't supported for zipfile.Path yet"

at = self.at
names = []
for n in self.root.namelist():
if not n.startswith(at):
continue
rest = n[len(at) :]
if rest != '':
# no need to append the subdir itself?
names.append(rest)
names.sort()

# note: seems that zip always uses forward slash, regardless OS?
for r, dirs, files in _walk_paths(names, separator='/'):
# make sure we don't construct ZipPath with at='.'... this behaves weird
rr = self if r == '.' else self / r
yield rr, dirs, files
139 changes: 139 additions & 0 deletions src/kompress/_zippath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Helper utils for zippath adapter
"""

from __future__ import annotations

import os
from pathlib import Path
from typing import Iterable, Iterator, List, Tuple

RootName = str
DirName = str
FileName = str
Entry = Tuple[RootName, List[Tuple[DirName, 'Entry']], List[FileName]]


def _walk_paths(
paths: Iterable[str],
separator: str,
) -> Iterator[tuple[RootName, list[DirName], list[FileName]]]:
# this is basically a tree, so we can walk it later, potentially skipping dirs modified in-place by the callee
stack: list[Entry] = [('.', [], [])]
stack_pos = 0

for p in paths:
split = p.rsplit(separator, maxsplit=1)
if len(split) == 1:
p_parent = '.'
[p_name] = split
else:
p_parent, p_name = split

is_dir = p_name == ''

if is_dir:
split = p_parent.rsplit(separator, maxsplit=1)
if len(split) == 1:
target_root = '.'
[dirname] = split
else:
# todo hmm can we avoid extra split?
target_root, dirname = p_parent.rsplit(separator, maxsplit=1)
else:
target_root = p_parent

while True:
assert stack_pos >= 0, (p, stack)
parent_root, parent_dirs, parent_files = stack[stack_pos]
if target_root == parent_root:
break
stack_pos -= 1

if is_dir: # new dir detected!
new_entry: Entry = (p_parent, [], [])
stack.append(new_entry)
stack_pos = len(stack) - 1

parent_dirs.append((dirname, new_entry)) # type: ignore[possibly-undefined]
else:
assert stack_pos != -1, (p, stack)
parent_files.append(p_name)

def _traverse(entry: Entry) -> Iterator[tuple[RootName, list[DirName], list[FileName]]]:
(root, dir_entries, files) = entry
child_dirs = dict(dir_entries)
dirnames = list(child_dirs.keys())

dirnames.sort()
files.sort()
yield root, dirnames, files

# traverse dirnames, not dir_entries! since we want to respect if the callee modifies them
for d in dirnames:
yield from _traverse(child_dirs[d])

yield from _traverse(stack[0])


def test_walk_paths_basic() -> None:
# not sure about this one but this is kinda compatible with pathlib.Path.glob behaviour
assert list(_walk_paths([], separator=os.sep)) == [
('.', [], []),
]

# just two files with no extra dirs
assert list(_walk_paths(['aaa', 'bbb'], separator=os.sep)) == [
('.', [], ['aaa', 'bbb']),
]

# one empty dir
assert list(_walk_paths(['aaa/'], separator='/')) == [
# fmt: off
('.' , ['aaa'], []),
('aaa', [] , []),
# fmt: on
]

# dir with one dir with one file
assert list(_walk_paths(['aaa/', 'aaa/bbb/', 'aaa/bbb/fff'], separator='/')) == [
# fmt: off
('.' , ['aaa'], []),
('aaa' , ['bbb'], []),
('aaa/bbb', [] , ['fff']),
# fmt: on
]


def test_walk_paths_against_stdlib() -> None:
import sys

import pytest

if sys.version_info[:2] < (3, 12):
pytest.skip("pathlibe.Path.walk is only present from python 3.12")

def as_paths(root: Path) -> Iterator[str]:
for r, dirs, files in root.walk():
rr = r.relative_to(root)
for d in dirs:
yield f'{rr/ d}{os.sep}'
for f in files:
yield str(rr / f)

def check_against_builtin(root: Path) -> None:
expected = []
for r, dirs, files in root.walk():
dirs.sort()
files.sort()
expected.append((str(r.relative_to(root)), dirs, files))
assert len(expected) > 1 # just in case

paths = sorted(as_paths(root))
actual = list(_walk_paths(paths, separator=os.sep))

assert expected == actual

git_path = Path(__file__).absolute().parent.parent.parent / '.git'
assert git_path.exists(), git_path
check_against_builtin(git_path)
10 changes: 10 additions & 0 deletions src/kompress/tests/kompress.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,22 @@ def test_zippath(tmp_path: Path) -> None:

assert ZipPath(target) == ZipPath(target)
assert zp.absolute() == zp
assert zp / '.' == zp

# shouldn't crash
hash(zp)

assert zp.exists()
assert (zp / 'gdpr_export').exists()
assert (zp / 'gdpr_export' / 'comments').exists()
## NOTE: in pathlib.Path these work, however not in zipfile.Path
## for now we don't support them either, need to be really careful if we wanna diverge from zipfile.Path
## but in
# assert (zp / '.').exists()
# assert (zp / '.' / 'gdpr_export').exists()
# assert (zp / 'gdpr_export' / './comments').exists()
##

# check str constructor just in case
assert (ZipPath(str(target)) / 'gdpr_export' / 'comments').exists()
assert not (ZipPath(str(target)) / 'whatever').exists()
Expand Down
90 changes: 90 additions & 0 deletions src/kompress/tests/zippath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from __future__ import annotations

from pathlib import Path
from zipfile import ZipFile

from .. import ZipPath

structure_data: Path = Path(__file__).parent / "structure_data"


def test_walk_empty(tmp_path: Path) -> None:
path = tmp_path / 'empty.zip'
with ZipFile(path, 'w'):
pass
assert path.exists()
zp = ZipPath(path)

# this is consistent with pathlib.Path.walk over empty dir
assert list(zp.walk()) == [
(zp / '.', [], []),
]


def test_walk_1(tmp_path: Path) -> None:
path = tmp_path / 'empty.zip'
with ZipFile(path, 'w') as z:
z.writestr('file2', 'data2')
z.writestr('file1', 'data2')
assert path.exists()
zp = ZipPath(path)

assert list(zp.walk()) == [
(zp, [], ['file1', 'file2']),
]


def test_walk_2(tmp_path: Path) -> None:
path = tmp_path / 'empty.zip'
with ZipFile(path, 'w') as z:
z.writestr('empty_dir/', '')
z.writestr('file', 'alala')
z.writestr('aaa/bbb', 'some_data')
z.writestr('aaa/ccc/ddd', 'some_data_2')
assert path.exists()
zp = ZipPath(path)

assert list(zp.walk()) == [
# fmt: off
(zp , ['aaa', 'empty_dir'], ['file']),
(zp / 'aaa' , ['ccc'] , ['bbb']),
(zp / 'aaa/ccc' , [] , ['ddd']),
(zp / 'empty_dir', [] , []),
# fmt: on
]

# testcase when we aren't starting from root
assert list((zp / 'aaa').walk()) == [
# fmt: off
(zp / 'aaa' , ['ccc'] , ['bbb']),
(zp / 'aaa/ccc' , [] , ['ddd']),
# fmt: on
]

# check that .walk respects modifying dirs in-place, like regular pathlib
all_files = []
for _r, dirs, files in zp.walk():
if 'ccc' in dirs:
dirs.remove('ccc')
all_files.extend(files)
assert all_files == ['file', 'bbb']


def test_walk_gdpr_export() -> None:
target = structure_data / 'gdpr_export.zip'
assert target.exists(), target # precondition

zp = ZipPath(target)

def _check_walk(z):
for r, dirs, files in z.walk():
assert r.exists()
yield r
for d in dirs:
assert (r / d).exists()
for f in files:
assert (r / f).exists()
yield (r / f)

results = list(_check_walk(zp))
assert len(results) == 8

0 comments on commit 5103075

Please sign in to comment.