From 3448963d4cc44aaeb5f555ba530593a89fdeb7fe Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Wed, 18 Sep 2024 00:11:39 +0100 Subject: [PATCH] general: big cleanup - deprecate kexists/kopen with fallbacks - move zip stuff to a separate file - update ruff config --- mypy.ini | 5 +- ruff.toml | 20 +-- src/kompress/__init__.py | 291 +++++++-------------------------- src/kompress/common.py | 9 - src/kompress/compat.py | 11 ++ src/kompress/tar.py | 33 ++-- src/kompress/tests/kompress.py | 95 ++++++----- src/kompress/zip.py | 178 ++++++++++++++++++++ 8 files changed, 335 insertions(+), 307 deletions(-) delete mode 100644 src/kompress/common.py create mode 100644 src/kompress/compat.py create mode 100644 src/kompress/zip.py diff --git a/mypy.ini b/mypy.ini index 5a21a85..9ab1c91 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,14 +1,13 @@ [mypy] -namespace_packages = True pretty = True show_error_context = True -show_error_codes = True show_column_numbers = True show_error_end = True +warn_redundant_casts = True warn_unused_ignores = True check_untyped_defs = True -enable_error_code = possibly-undefined strict_equality = True +enable_error_code = possibly-undefined # an example of suppressing # [mypy-my.config.repos.pdfannots.pdfannots] diff --git a/ruff.toml b/ruff.toml index 328e40f..9376d03 100644 --- a/ruff.toml +++ b/ruff.toml @@ -26,10 +26,10 @@ lint.extend-select = [ "TID", # various imports suggestions "TRY", # various exception handling rules "UP", # detect deprecated python stdlib stuff - # "FA", # TODO enable later after we make sure cachew works? - # "PTH", # pathlib migration -- TODO enable later - # "ARG", # TODO useful, but results in some false positives in pytest fixtures... maybe later - # "A", # TODO builtin shadowing -- handle later + "FA", # suggest using from __future__ import annotations + "PTH", # pathlib migration + "ARG", # unused argument checks + "A", # builtin shadowing # "EM", # TODO hmm could be helpful to prevent duplicate err msg in traceback.. but kinda annoying # "ALL", # uncomment this to check for new rules! @@ -63,17 +63,11 @@ lint.ignore = [ "E402", # Module level import not at top of file ### maybe consider these soon -# sometimes it's useful to give a variable a name even if we don't use it as a documentation -# on the other hand, often is a sign of error + # sometimes it's useful to give a variable a name even if we don't use it as a documentation + # on the other hand, often is a sign of error "F841", # Local variable `count` is assigned to but never used - "F401", # imported but unused ### -### TODO should be fine to use these with from __future__ import annotations? -### there was some issue with cachew though... double check this? - "UP006", # use type instead of Type - "UP007", # use X | Y instead of Union -### "RUF100", # unused noqa -- handle later "RUF012", # mutable class attrs should be annotated with ClassVar... ugh pretty annoying for user configs @@ -132,6 +126,8 @@ lint.ignore = [ "TID252", # Prefer absolute imports over relative imports from parent modules + "UP038", # suggests using | (union) in isisntance checks.. but it results in slower code + ## too annoying "T20", # just complains about prints and pprints "Q", # flake quotes, too annoying diff --git a/src/kompress/__init__.py b/src/kompress/__init__.py index a8359d0..37e55fb 100644 --- a/src/kompress/__init__.py +++ b/src/kompress/__init__.py @@ -4,27 +4,13 @@ import io import os import pathlib -import posixpath import sys import tarfile -import zipfile -from datetime import datetime -from functools import total_ordering from pathlib import Path -from typing import ( - IO, - TYPE_CHECKING, - Iterator, - List, - Sequence, - Union, -) +from typing import IO, TYPE_CHECKING -from .common import BasePath from .tar import TarPath -from .utils import walk_paths - -PathIsh = Union[Path, str] +from .zip import ZipPath class Ext: @@ -39,14 +25,14 @@ class Ext: # fmt: on -def is_compressed(p: PathIsh) -> bool: +def is_compressed(p: Path | str) -> bool: pp = p if isinstance(p, Path) else Path(p) # todo kinda lame way for now.. use mime ideally? # should cooperate with kompress.kopen? return pp.name.endswith((Ext.xz, Ext.zip, Ext.lz4, Ext.zstd, Ext.zst, Ext.targz, Ext.gz)) -def _zstd_open(path: Path, *args, **kwargs) -> IO: +def _zstd_open(path: Path, *args, **kwargs) -> IO: # noqa: ARG001 import zstandard fh = path.open('rb') @@ -62,9 +48,45 @@ def _zstd_open(path: Path, *args, **kwargs) -> IO: return io.TextIOWrapper(reader, **kwargs) # meh -# TODO dunno, I guess it should be open and exists after all? similar to os.path -# TODO use the 'dependent type' trick for return type? -def kopen(path: PathIsh, *args, mode: str = 'rt', **kwargs) -> IO: +class CPath(Path): + """ + Hacky way to support compressed files. + If you can think of a better way to do this, please let me know! https://github.com/karlicoss/HPI/issues/20 + + Ugh. So, can't override Path because of some _flavour thing. + Path only has _accessor and _closed slots, so can't directly set .open method + _accessor.open has to return file descriptor, doesn't work for compressed stuff. + """ + + if sys.version_info[:2] < (3, 12): + # older version of python need _flavour defined + _flavour = pathlib._windows_flavour if os.name == 'nt' else pathlib._posix_flavour # type: ignore[attr-defined] + + def __new__(cls, *args, **kwargs): + path = Path(*args) + if path.name.endswith(Ext.zip): + if path.exists(): + # if path doesn't exist, zipfile can't open it to read the index etc + # so it's the best we can do in this case? + # TODO move this into ZipPath.__new__? + return ZipPath(path) + if path.name.endswith(Ext.targz): + return TarPath(path) + return super().__new__(cls, *args, **kwargs) + + def open(self, *args, **kwargs): # noqa: ARG002 + kopen_kwargs = {} + mode = kwargs.get('mode') + if mode is not None: + kopen_kwargs['mode'] = mode + encoding = kwargs.get('encoding') + if encoding is not None: + kopen_kwargs['encoding'] = encoding + # TODO assert read only? + return _cpath_open(str(self), **kopen_kwargs) + + +def _cpath_open(path: Path | str, *args, mode: str = 'rt', **kwargs) -> IO: # just in case, but I think this shouldn't be necessary anymore # since when we call .read_text, encoding is passed already if mode in {'r', 'rt'}: @@ -133,216 +155,25 @@ def kopen(path: PathIsh, *args, mode: str = 'rt', **kwargs) -> IO: return pp.open(mode, *args, **kwargs) -class CPath(BasePath): - """ - Hacky way to support compressed files. - If you can think of a better way to do this, please let me know! https://github.com/karlicoss/HPI/issues/20 - - Ugh. So, can't override Path because of some _flavour thing. - Path only has _accessor and _closed slots, so can't directly set .open method - _accessor.open has to return file descriptor, doesn't work for compressed stuff. - """ - - def __new__(cls, *args, **kwargs): - path = Path(*args) - if path.name.endswith(Ext.zip): - # We need a special case here, since zip always needs a subpath - # If we just construct CPath(zip_archive) / "path/inside/zip" - # , then it's hard for kopen to know if it's a zip without looking at individual path parts - # This way it's a bit more explicit. - # possibly useful for tar.gz as well? - return ZipPath(path) - elif path.name.endswith(Ext.targz): # TODO add support for kopen too? should really make it private - return TarPath(path) - return super().__new__(cls, *args, **kwargs) - - def open(self, *args, **kwargs): - kopen_kwargs = {} - mode = kwargs.get('mode') - if mode is not None: - kopen_kwargs['mode'] = mode - encoding = kwargs.get('encoding') - if encoding is not None: - kopen_kwargs['encoding'] = encoding - # TODO assert read only? - return kopen(str(self), **kopen_kwargs) - - -open = kopen # TODO deprecate - - -# meh -# TODO ideally switch to ZipPath or smth similar? -# nothing else supports subpath properly anyway -def kexists(path: PathIsh, subpath: str) -> bool: - try: - kopen(path, subpath) - except Exception: - return False - else: - return True - - -@total_ordering -class ZipPath(zipfile.Path): - # NOTE: is_dir/is_file might not behave as expected, the base class checks it only based on the slash in path - - _flavour = posixpath # this is necessary for some pathlib operations (in particular python 3.12) - - # seems that root/at are not exposed in the docs, so might be an implementation detail - root: zipfile.CompleteDirs - at: str - - def __init__(self, root: Union[str, Path, zipfile.ZipFile, ZipPath], at: str = "") -> None: - root_: Union[str, Path, zipfile.ZipFile] - if isinstance(root, ZipPath): - # hack to make sure ZipPath(ZipPath(...)) works - root_ = root.root - at_ = root.at - else: - root_ = root - at_ = at - - super().__init__(root_, at_) - - @property - def filepath(self) -> Path: - res = self.root.filename - assert res is not None # make mypy happy - assert isinstance(res, str) - return Path(res) - - @property - def subpath(self) -> Path: - return Path(self.at) - - def absolute(self) -> ZipPath: - return ZipPath(self.filepath.absolute(), self.at) - - def expanduser(self) -> ZipPath: - return ZipPath(self.filepath.expanduser(), self.at) - - def exists(self) -> bool: - if self.at == '': - # special case, the base class returns False in this case for some reason - return self.filepath.exists() - return super().exists() or self._as_dir().exists() - # TODO hmm seems that base class has special treatment for .at argument during construction, - # it actually checks if it's a file or a dir, and in case of dir, appends '/'? - # maybe use resolve_dir thing from base class?? - - def _as_dir(self) -> zipfile.Path: - # note: seems that zip always uses forward slash, regardless OS? - return zipfile.Path(self.root, self.at + '/') - - def rglob(self, glob: str) -> Iterator[ZipPath]: - # note: not 100% sure about the correctness, but seem fine? - # Path.match() matches from the right, so need to - rpaths = (p for p in self.root.namelist() if p.startswith(self.at)) - rpaths = (p for p in rpaths if Path(p).match(glob)) - return (ZipPath(self.root, p) for p in rpaths) - - # TODO remove unused-ignore after 3.8 - def relative_to(self, other: ZipPath, *extra: Union[str, os.PathLike[str]]) -> Path: # type: ignore[override,unused-ignore] - assert self.filepath == other.filepath, (self.filepath, other.filepath) - return self.subpath.relative_to(other.subpath, *extra) - - @property - def parts(self) -> Sequence[str]: - return self._parts - - @property - def _parts(self) -> Sequence[str]: - # a bit of an implementation detail, but sometimes it's used by pathlib - # messy, but might be ok.. - return self.filepath.parts + self.subpath.parts - - @property - def _raw_paths(self) -> Sequence[str]: - # used in 3.12 for some operations - return self._parts - - def __truediv__(self, key) -> ZipPath: - # need to implement it so the return type is not zipfile.Path - tmp = zipfile.Path(self.root) / self.at / key - return ZipPath(self.root, tmp.at) - - def iterdir(self) -> Iterator[ZipPath]: - for s in self._as_dir().iterdir(): - yield ZipPath(s.root, s.at) # type: ignore[attr-defined] - - @property - def stem(self) -> str: - return self.subpath.stem - - @property # type: ignore[misc] - def __class__(self): - return Path - - def __eq__(self, other) -> bool: - # hmm, super class doesn't seem to treat as equals unless they are the same object - if not isinstance(other, ZipPath): - return False - return (self.filepath, self.subpath) == (other.filepath, other.subpath) - - def __lt__(self, other) -> bool: - if not isinstance(other, ZipPath): - return False - return (self.filepath, self.subpath) < (other.filepath, other.subpath) - - def __hash__(self) -> int: - return hash((self.filepath, self.subpath)) - - def stat(self) -> os.stat_result: - # NOTE: zip datetimes have no notion of time zone, usually they just keep local time? - # see https://en.wikipedia.org/wiki/ZIP_(file_format)#Structure - dt = datetime(*self.root.getinfo(self.at).date_time) - ts = int(dt.timestamp()) - params = { - 'st_mode': 0, - 'st_ino': 0, - 'st_dev': 0, - 'st_nlink': 1, - 'st_uid': 1000, - 'st_gid': 1000, - 'st_size': 0, # todo compute it properly? - 'st_atime': ts, - 'st_mtime': ts, - 'st_ctime': ts, - } - return os.stat_result(tuple(params.values())) - - @property - def suffixes(self) -> List[str]: - return Path(self.parts[-1]).suffixes +if not TYPE_CHECKING: + # FIXME deprecate properly + # still used in promnesia legacy takeout module? could migrate off + # ah ok, promnesia works off my.core.kompress (which is itself deprecated) + # so we could perhaps add kopen/kexists adapters that just do Cpath(first_arg) / Path(rest)? + # pass kwargs to open? like mode/encoding - @property - def suffix(self) -> str: - return Path(self.parts[-1]).suffix + from .compat import deprecated - def walk( - self, - *, - top_down: bool = True, - on_error=None, - follow_symlinks: bool = False, - ) -> Iterator[tuple[ZipPath, list[str], list[str]]]: - assert top_down, "specifying top_down isn't supported for zipfile.Path yet" - assert on_error is None, "on_error isn't supported for zipfile.Path yet" + @deprecated('use Cpath(...).open() instead') + def kopen(path, *args, **kwargs): + cpath = CPath(path) / Path(*args) + return cpath.open(**kwargs) - at = self.at - names = [] - for n in self.root.namelist(): - if not n.startswith(at): - continue - rest = n[len(at) :] - if rest != '': - # no need to append the subdir itself? - names.append(rest) - names.sort() + @deprecated('use Cpath(...).open() instead') + def open(*args, **kwargs): # noqa: A001 + return kopen(*args, **kwargs) - # note: seems that zip always uses forward slash, regardless OS? - for r, dirs, files in walk_paths(names, separator='/'): - # make sure we don't construct ZipPath with at='.'... this behaves weird - rr = self if r == '.' else self / r - yield rr, dirs, files + @deprecated('use Cpath(...).exists() instead') + def kexists(path, *args) -> bool: + cpath = CPath(path) / Path(*args) + return cpath.exists() diff --git a/src/kompress/common.py b/src/kompress/common.py deleted file mode 100644 index 8398d02..0000000 --- a/src/kompress/common.py +++ /dev/null @@ -1,9 +0,0 @@ -import os -import pathlib -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - # otherwise mypy can't figure out that BasePath is a type alias.. - BasePath = pathlib.Path -else: - BasePath = pathlib.WindowsPath if os.name == 'nt' else pathlib.PosixPath diff --git a/src/kompress/compat.py b/src/kompress/compat.py new file mode 100644 index 0000000..64a5f67 --- /dev/null +++ b/src/kompress/compat.py @@ -0,0 +1,11 @@ +import sys + +if sys.version_info[:2] >= (3, 13): + from warnings import deprecated +else: + from typing_extensions import deprecated + + +__all__ = [ + 'deprecated', +] diff --git a/src/kompress/tar.py b/src/kompress/tar.py index 715dfac..773b7de 100644 --- a/src/kompress/tar.py +++ b/src/kompress/tar.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from pathlib import Path from tarfile import TarFile, TarInfo -from typing import Dict, Generator, Optional, Union +from typing import Dict, Generator from typing_extensions import Self @@ -47,11 +47,11 @@ class TarPath(Path): def __new__( cls, - tar: Union[str, Path, TarPath, TarFile], + tar: str | Path | TarPath | TarFile, *, - _nodes: Optional[Nodes] = None, - _rpath: Optional[Path] = None, - _node: Optional[Node] = None, + _nodes: Nodes | None = None, + _rpath: Path | None = None, + _node: Node | None = None, ) -> Self: if isinstance(tar, TarPath): # make sure TarPath(TarPath(...)) works @@ -68,16 +68,22 @@ def __new__( # otherwise it's str | Path -- need to build a new TarFile + Node for it XX assert _node is None, _node # just in case path = Path(tar) + + if not path.exists(): + # if it doesn't exist, tarpath can't open it... + # so it's the best we can do is just return a regular path + return path # type: ignore[return-value] + tar, nodes, root = TarPath._make_args(path) - return cls(tar=tar, _nodes=nodes, _node=root, _rpath=Path('.')) + return cls(tar=tar, _nodes=nodes, _node=root, _rpath=Path()) def __init__( self, - tar: Union[str, Path, TarPath, TarFile], + tar: str | Path | TarPath | TarFile, *, - _nodes: Optional[Nodes] = None, - _rpath: Optional[Path] = None, - _node: Optional[Node] = None, + _nodes: Nodes | None = None, + _rpath: Path | None = None, + _node: Node | None = None, ) -> None: if hasattr(self, 'tar'): # already initialized via __new__ @@ -109,7 +115,7 @@ def is_file(self) -> bool: def is_dir(self) -> bool: return self.node.info.isdir() - def exists(self, **kwargs) -> bool: + def exists(self, **kwargs) -> bool: # noqa: ARG002 return self._node is not None # meh def iterdir(self) -> Generator[TarPath, None, None]: @@ -150,7 +156,7 @@ def _make_args(path: Path) -> tuple[TarFile, Nodes, Node]: paths.append(p) infos[m.name] = m - nodes: Dict[str, Node] = {} + nodes: dict[str, Node] = {} def get_node(p: str) -> Node: node = nodes.get(p) @@ -182,6 +188,9 @@ def get_node(p: str) -> Node: def test_tar_dir(tmp_path: Path) -> None: from . import CPath # avoid circular import + nonexistent = CPath(tmp_path / 'donotexist.tar.gz') + assert not nonexistent.exists() + structure_data: Path = Path(__file__).parent / 'tests/structure_data' target = structure_data / 'gdpr_export.tar.gz' diff --git a/src/kompress/tests/kompress.py b/src/kompress/tests/kompress.py index 2ba7636..ba552c2 100644 --- a/src/kompress/tests/kompress.py +++ b/src/kompress/tests/kompress.py @@ -7,29 +7,19 @@ import pytest -from .. import CPath, ZipPath, kexists, kopen +from .. import CPath, ZipPath structure_data: Path = Path(__file__).parent / "structure_data" -def test_kopen(tmp_path: Path) -> None: - "Plaintext handled transparently" - # fmt: off - assert kopen(tmp_path / 'file' ).read() == 'just plaintext' - assert kopen(tmp_path / 'file.xz').read() == 'compressed text' - # fmt: on - - def test_zip(tmp_path: Path) -> None: - # zips always contain a file inside it, so require a bit of a special handling - # e.g. need to pass a 'subpath' into kopen subpath = 'path/in/archive' if sys.version_info[:2] == (3, 8): # seems that zippath used to return bytes in 3.8 - assert kopen(tmp_path / 'file.zip', subpath).read() == b'data in zip' + assert (CPath(tmp_path / 'file.zip') / subpath).open().read() == b'data in zip' else: - assert kopen(tmp_path / 'file.zip', subpath).read() == 'data in zip' + assert (CPath(tmp_path / 'file.zip') / subpath).open().read() == 'data in zip' # CPath should dispatch zips to ZipPath cpath = CPath(tmp_path / 'file.zip') @@ -43,27 +33,28 @@ def test_zip(tmp_path: Path) -> None: assert isinstance(CPath(cpath), ZipPath) -def test_kexists(tmp_path: Path) -> None: - # TODO also test top level? - # fmt: off - assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive') - assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive') - # fmt: on +def test_cpath_zip(tmp_path: Path) -> None: + assert (CPath(tmp_path / 'file.zip') / 'path/in/archive').exists() + assert not (CPath(tmp_path / 'file.zip') / 'path/notin/archive').exists() - # TODO not sure about this? - assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive') + assert not (CPath(tmp_path / 'nosuchzip.zip') / 'path/in/archive').exists() @pytest.mark.parametrize( - ('file', 'expected'), + ('filename', 'expected'), [ ('file', 'just plaintext'), ('file.xz', 'compressed text'), ], ) -def test_cpath(file: str, expected: str, tmp_path: Path) -> None: - # check different ways of constructing the path - path = tmp_path / file +def test_cpath_regular(filename: str, expected: str, tmp_path: Path) -> None: + """ + Check different ways of interacting with CPath + """ + path = tmp_path / filename + + assert CPath(path).open().read() == expected + for args in [ [str(path)], [path], @@ -74,21 +65,10 @@ def test_cpath(file: str, expected: str, tmp_path: Path) -> None: assert CPath(*args).read_text() == expected # type: ignore[misc] -@pytest.fixture(autouse=True) -def prepare(tmp_path: Path): - (tmp_path / 'file').write_text('just plaintext') - with (tmp_path / 'file.xz').open('wb') as f: - with lzma.open(f, 'w') as lzf: - lzf.write(b'compressed text') - with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf: - zf.writestr('path/in/archive', 'data in zip') - try: - yield None - finally: - pass - - def test_zippath(tmp_path: Path) -> None: + # TODO support later... + # zz = ZipPath(tmp_path / 'doesntexist.zip') + zp = ZipPath(tmp_path / 'file.zip', 'path/in/archive') assert zp.read_text() == 'data in zip' @@ -126,6 +106,7 @@ def test_zippath(tmp_path: Path) -> None: assert zp.exists() assert (zp / 'gdpr_export').exists() assert (zp / 'gdpr_export' / 'comments').exists() + assert (zp / Path('gdpr_export', 'comments')).exists() ## NOTE: in pathlib.Path these work, however not in zipfile.Path ## for now we don't support them either, need to be really careful if we wanna diverge from zipfile.Path ## but in @@ -199,14 +180,14 @@ def test_gz(tmp_path: Path) -> None: # test against gzip magic number assert gzf.read_bytes()[:2] == b'\x1f\x8b' - with kopen(gzf) as f: + with CPath(gzf).open() as f: assert hasattr(f, 'read') assert hasattr(f, 'readable') assert f.readable() assert not f.writable() assert f.read() == 'compressed text' # if not specified, defaults to rt - with kopen(gzf, mode='rb') as f: + with CPath(gzf).open(mode='rb') as f: assert isinstance(f, gzip.GzipFile) assert f.read() == b'compressed text' @@ -222,3 +203,35 @@ def test_gz(tmp_path: Path) -> None: assert CPath(gzf).read_text() == 'compressed text' assert CPath(gzf).read_bytes() == b'compressed text' + + +def test_kopen_kexists(tmp_path: Path) -> None: + """ + Testing deprecations, can remove when we remove kexists/kopen + """ + from .. import kexists, kopen # type: ignore[attr-defined] + + path = Path(tmp_path / 'file.zip') + + read_res = kopen(path, 'path', 'in', 'archive').read() + if sys.version_info[:2] == (3, 8): + # seems that zippath used to return bytes in 3.8 + assert read_res == b'data in zip' + else: + assert read_res == 'data in zip' + assert kexists(path, 'path/in/archive') + assert not kexists(path, 'does/not/exist') + + +@pytest.fixture(autouse=True) +def prepare_data(tmp_path: Path): + (tmp_path / 'file').write_text('just plaintext') + with (tmp_path / 'file.xz').open('wb') as f: + with lzma.open(f, 'w') as lzf: + lzf.write(b'compressed text') + with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf: + zf.writestr('path/in/archive', 'data in zip') + try: + yield None + finally: + pass diff --git a/src/kompress/zip.py b/src/kompress/zip.py new file mode 100644 index 0000000..fd0c786 --- /dev/null +++ b/src/kompress/zip.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import os +import zipfile +from datetime import datetime +from functools import total_ordering +from pathlib import Path +from typing import Iterator, Sequence + +from .utils import walk_paths + + +@total_ordering +class ZipPath(zipfile.Path): + # NOTE: is_dir/is_file might not behave as expected, the base class checks it only based on the slash in path + + _flavour = os.path # this is necessary for some pathlib operations (in particular python 3.12) + + # seems that root/at are not exposed in the docs, so might be an implementation detail + root: zipfile.CompleteDirs + at: str + + def __init__(self, root: str | Path | zipfile.ZipFile | ZipPath, at: str = "") -> None: + root_: str | Path | zipfile.ZipFile + if isinstance(root, ZipPath): + # hack to make sure ZipPath(ZipPath(...)) works + root_ = root.root + at_ = root.at + else: + root_ = root + at_ = at + + super().__init__(root_, at_) + + @property + def filepath(self) -> Path: + res = self.root.filename + assert res is not None # make mypy happy + assert isinstance(res, str) + return Path(res) + + @property + def subpath(self) -> Path: + return Path(self.at) + + def absolute(self) -> ZipPath: + return ZipPath(self.filepath.absolute(), self.at) + + def expanduser(self) -> ZipPath: + return ZipPath(self.filepath.expanduser(), self.at) + + def exists(self) -> bool: + if self.at == '': + # special case, the base class returns False in this case for some reason + return self.filepath.exists() + return super().exists() or self._as_dir().exists() + # TODO hmm seems that base class has special treatment for .at argument during construction, + # it actually checks if it's a file or a dir, and in case of dir, appends '/'? + # maybe use resolve_dir thing from base class?? + + def _as_dir(self) -> zipfile.Path: + # note: seems that zip always uses forward slash, regardless OS? + return zipfile.Path(self.root, self.at + '/') + + def rglob(self, glob: str) -> Iterator[ZipPath]: + # note: not 100% sure about the correctness, but seem fine? + # Path.match() matches from the right, so need to + rpaths = (p for p in self.root.namelist() if p.startswith(self.at)) + rpaths = (p for p in rpaths if Path(p).match(glob)) + return (ZipPath(self.root, p) for p in rpaths) + + # TODO remove unused-ignore after 3.8 + def relative_to(self, other: ZipPath, *extra: str | os.PathLike[str]) -> Path: # type: ignore[override,unused-ignore] + assert self.filepath == other.filepath, (self.filepath, other.filepath) + return self.subpath.relative_to(other.subpath, *extra) + + @property + def parts(self) -> Sequence[str]: + return self._parts + + @property + def _parts(self) -> Sequence[str]: + # a bit of an implementation detail, but sometimes it's used by pathlib + # messy, but might be ok.. + return self.filepath.parts + self.subpath.parts + + @property + def _raw_paths(self) -> Sequence[str]: + # used in 3.12 for some operations + return self._parts + + def __truediv__(self, key) -> ZipPath: + # need to implement it so the return type is not zipfile.Path + if isinstance(key, Path): + # zipfile always uses / separator + key = '/'.join(key.parts) + tmp = zipfile.Path(self.root) / self.at / key + return ZipPath(self.root, tmp.at) + + def iterdir(self) -> Iterator[ZipPath]: + for s in self._as_dir().iterdir(): + yield ZipPath(s.root, s.at) # type: ignore[attr-defined] + + @property + def stem(self) -> str: + return self.subpath.stem + + @property # type: ignore[misc] + def __class__(self): + return Path + + def __eq__(self, other) -> bool: + # hmm, super class doesn't seem to treat as equals unless they are the same object + if not isinstance(other, ZipPath): + return False + return (self.filepath, self.subpath) == (other.filepath, other.subpath) + + def __lt__(self, other) -> bool: + if not isinstance(other, ZipPath): + return False + return (self.filepath, self.subpath) < (other.filepath, other.subpath) + + def __hash__(self) -> int: + return hash((self.filepath, self.subpath)) + + def stat(self) -> os.stat_result: + # NOTE: zip datetimes have no notion of time zone, usually they just keep local time? + # see https://en.wikipedia.org/wiki/ZIP_(file_format)#Structure + dt = datetime(*self.root.getinfo(self.at).date_time) + ts = int(dt.timestamp()) + params = { + 'st_mode': 0, + 'st_ino': 0, + 'st_dev': 0, + 'st_nlink': 1, + 'st_uid': 1000, + 'st_gid': 1000, + 'st_size': 0, # todo compute it properly? + 'st_atime': ts, + 'st_mtime': ts, + 'st_ctime': ts, + } + return os.stat_result(tuple(params.values())) + + @property + def suffixes(self) -> list[str]: + return Path(self.parts[-1]).suffixes + + @property + def suffix(self) -> str: + return Path(self.parts[-1]).suffix + + def walk( + self, + *, + top_down: bool = True, + on_error=None, + follow_symlinks: bool = False, # noqa: ARG002 + ) -> Iterator[tuple[ZipPath, list[str], list[str]]]: + assert top_down, "specifying top_down isn't supported for zipfile.Path yet" + assert on_error is None, "on_error isn't supported for zipfile.Path yet" + + at = self.at + names = [] + for n in self.root.namelist(): + if not n.startswith(at): + continue + rest = n[len(at) :] + if rest != '': + # no need to append the subdir itself? + names.append(rest) + names.sort() + + # note: seems that zip always uses forward slash, regardless OS? + for r, dirs, files in walk_paths(names, separator='/'): + # make sure we don't construct ZipPath with at='.'... this behaves weird + rr = self if r == '.' else self / r + yield rr, dirs, files