Skip to content

Commit

Permalink
tar: support glob/rglob
Browse files Browse the repository at this point in the history
  • Loading branch information
karlicoss committed Sep 18, 2024
1 parent 3448963 commit 752b3c0
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/kompress/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class CPath(Path):
_flavour = pathlib._windows_flavour if os.name == 'nt' else pathlib._posix_flavour # type: ignore[attr-defined]

def __new__(cls, *args, **kwargs):
# TODO shortcut if args[0] is already Cpath?

path = Path(*args)
if path.name.endswith(Ext.zip):
if path.exists():
Expand Down
61 changes: 58 additions & 3 deletions src/kompress/tar.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import fnmatch
import io
import os
import pathlib
Expand All @@ -8,7 +9,7 @@
from dataclasses import dataclass
from pathlib import Path
from tarfile import TarFile, TarInfo
from typing import Dict, Generator
from typing import Dict, Generator, Iterator

from typing_extensions import Self

Expand Down Expand Up @@ -126,6 +127,20 @@ def iterdir(self) -> Generator[TarPath, None, None]:
rpath = self._rpath / entry.name
yield TarPath(tar=self.tar, _nodes=self._nodes, _rpath=rpath, _node=entry)

def glob(self, pattern: str, **kwargs) -> Iterator[TarPath]: # type: ignore[override] # noqa: ARG002
parts = self._rpath.parts
prefix = '' if len(parts) == 0 else ('/'.join(parts) + '/')
full_pattern = prefix + pattern
for p, node in self._nodes.items():
if not fnmatch.fnmatch(p, full_pattern):
continue
rpath = Path(*p.split('/'))
yield TarPath(tar=self.tar, _nodes=self._nodes, _rpath=rpath, _node=node)

def rglob(self, pattern: str, **kwargs) -> Iterator[TarPath]: # type: ignore[override] # noqa: ARG002
# TODO ugh.. not necessarily consistent with pathlib behaviour... need to double check later
return self.glob('*' + pattern)

def __repr__(self) -> str:
return f'{self.tar=} {self._rpath=} {self._node=}'

Expand All @@ -152,9 +167,18 @@ def _make_args(path: Path) -> tuple[TarFile, Nodes, Node]:
infos = {}
for m in members:
is_dir = m.isdir()
p = m.name + (sep if is_dir else '')

norm_name = m.name
if norm_name == '.':
# sometimes root is included? we don't need it for walk_paths
continue
if norm_name[:2] == './':
# sometimes archive is created against current dir (.), this ends up with awkward dots in the index...
norm_name = norm_name[2:]

p = norm_name + (sep if is_dir else '')
paths.append(p)
infos[m.name] = m
infos[norm_name] = m

nodes: dict[str, Node] = {}

Expand Down Expand Up @@ -256,3 +280,34 @@ def test_tar_dir(tmp_path: Path) -> None:

with index.open() as fo:
assert fo.read() == 'test message\n'


def test_tar_dir_leading_dot() -> None:
"""
Test for 'flat' tar file, when it has leading dots in paths
(can happen if you did smth like tar -czvf ../archive.tar.gz .)
"""
from . import CPath # avoid circular import

structure_data: Path = Path(__file__).parent / 'tests/structure_data'
target = structure_data / 'with_leading_dot.tar.gz'

assert target.exists(), target

cpath = CPath(target)

assert not (cpath / 'whatever').exists()
assert (cpath / Path('c', 'hello')).exists()
assert (cpath / 'b.txt').read_text() == 'contents\n'

assert sorted(cpath.iterdir()) == [cpath / 'a', cpath / 'b.txt', cpath / 'c']

assert list(cpath.glob('h*')) == []
assert list(cpath.glob('b.txt')) == [cpath / 'b.txt']
assert list((cpath / 'b.txt').glob('*')) == []
assert list((cpath / 'c').glob('h*')) == [cpath / 'c' / 'hello']

assert list(cpath.rglob('b.txt')) == [cpath / 'b.txt']
assert list(cpath.rglob('h*')) == [cpath / 'c/hello']
assert list(cpath.rglob('c')) == [cpath / 'c']
assert list((cpath / 'c').rglob('h*')) == [cpath / 'c' / 'hello']
Binary file not shown.

0 comments on commit 752b3c0

Please sign in to comment.