Skip to content

Commit

Permalink
Better handling of unsafe filenames
Browse files Browse the repository at this point in the history
See #346,

The handling of unsafe Filenames is a bit weird as the safety is checked
a bit too late in my opinion, and checked before the path is fully
resolved.

This tries to change this a bit by eing stricter on checking for
safeness, and refactor the caching of import db to use Pathlib (which
did not exists when FIlename was crreated).

This is still a bit more complicated than it should and might need some
cleanup later on.

There is also a buch of "deprecated" code in a if True in this area,
that would need propoer deprecation.
  • Loading branch information
Carreau committed Jul 16, 2024
1 parent 3611e27 commit 2219183
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 36 deletions.
4 changes: 2 additions & 2 deletions lib/python/pyflyby/_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __new__(cls, arg):
def _from_filename(cls, filename: str):
if not isinstance(filename, str):
raise TypeError
filename = str(filename)
filename = os.path.abspath(filename)
if not filename:
raise UnsafeFilenameError("(empty string)")
# we only allow filename with given character set
Expand All @@ -58,7 +58,7 @@ def _from_filename(cls, filename: str):
if re.search("(^|/)~", filename):
raise UnsafeFilenameError(filename)
self = object.__new__(cls)
self._filename = os.path.abspath(filename)
self._filename = filename
return self

def __str__(self):
Expand Down
99 changes: 65 additions & 34 deletions lib/python/pyflyby/_importdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import re
import sys

from pathlib import Path

from typing import Any, Dict, Tuple, Union, List

from pyflyby._file import (Filename, UnsafeFilenameError,
Expand Down Expand Up @@ -201,6 +203,8 @@ class ImportDB:
mandatory_imports: ImportSet
canonical_imports: ImportMap

_default_cache: Dict[Any, Any] = {}

def __new__(cls, *args):
if len(args) != 1:
raise TypeError
Expand All @@ -213,7 +217,6 @@ def __new__(cls, *args):



_default_cache: Dict[Any, Any] = {}

@classmethod
def clear_default_cache(cls):
Expand All @@ -224,24 +227,22 @@ def clear_default_cache(cls):
cached results. Existing ImportDB instances are not affected by this
call.
"""
if cls._default_cache:
if logger.debug_enabled:
allpyfiles = set()
for tup in cls._default_cache:
if tup[0] != 2:
continue
for tup2 in tup[1:]:
for f in tup2:
assert isinstance(f, Filename)
if f.ext == '.py':
allpyfiles.add(f)
nfiles = len(allpyfiles)
logger.debug("ImportDB: Clearing default cache of %d files",
nfiles)
cls._default_cache.clear()
if logger.debug_enabled:
allpyfiles = set()
for tup in cls._default_cache:
if tup[0] != 2:
continue
for tup2 in tup[1:]:
for f in tup2:
assert isinstance(f, Filename)
if f.ext == ".py":
allpyfiles.add(f)
nfiles = len(allpyfiles)
logger.debug("ImportDB: Clearing default cache of %d files", nfiles)
cls._default_cache.clear()

@classmethod
def get_default(cls, target_filename: Union[Filename, str]):
def get_default(cls, target_filename: Union[Filename, str], /):
"""
Return the default import library for the given target filename.
Expand All @@ -258,7 +259,7 @@ def get_default(cls, target_filename: Union[Filename, str]):
:rtype:
`ImportDB`
"""
# We're going to canonicalize target_filenames in a number of steps.
# We're going to canonicalize target_filename in a number of steps.
# At each step, see if we've seen the input so far. We do the cache
# checking incrementally since the steps involve syscalls. Since this
# is going to potentially be executed inside the IPython interactive
Expand All @@ -267,25 +268,55 @@ def get_default(cls, target_filename: Union[Filename, str]):
# been touched, and if so, return new data. Check file timestamps at
# most once every 60 seconds.
cache_keys:List[Tuple[Any,...]] = []
if target_filename:
if isinstance(target_filename, str):
target_filename = Filename(target_filename)
target_filename = target_filename or Filename(".")
assert isinstance(target_filename, Filename)
if target_filename is None:
target_filename = "."

if isinstance(target_filename, Filename):
target_filename = str(target_filename)

assert isinstance(target_filename, str), (
target_filename,
type(target_filename),
)

target_path = Path(target_filename).resolve()

parents: List[Path]
if not target_path.is_dir():
parents = [target_path]
else:
parents = []

# filter safe parents
safe_parent = None
for p in parents + list(target_path.parents):
try:
safe_parent = Filename(str(p))
break
except UnsafeFilenameError:
pass
if safe_parent is None:
raise ValueError("No know path are safe")

target_dirname = safe_parent

if target_filename.startswith("/dev"):
target_filename = Filename(".")
target_dirname:Filename = target_filename
# TODO: with StatCache
while True:
cache_keys.append((1,
target_dirname,
os.getenv("PYFLYBY_PATH"),
os.getenv("PYFLYBY_KNOWN_IMPORTS_PATH"),
os.getenv("PYFLYBY_MANDATORY_IMPORTS_PATH")))
try:
return cls._default_cache[cache_keys[-1]]
except KeyError:
target_dirname = Filename(".")
except UnsafeFilenameError:
pass
# TODO: with StatCache
while True:
key = (
1,
target_dirname,
os.getenv("PYFLYBY_PATH"),
os.getenv("PYFLYBY_KNOWN_IMPORTS_PATH"),
os.getenv("PYFLYBY_MANDATORY_IMPORTS_PATH"),
)
cache_keys.append(key)
if key in cls._default_cache:
return cls._default_cache[key]
if target_dirname.isdir:
break
target_dirname = target_dirname.dir
Expand Down

0 comments on commit 2219183

Please sign in to comment.