Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle security audit findings #23

Merged
merged 5 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions dplib/actions/__spec__/test_metadata_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest

from dplib.actions.metadata.convert import convert_metadata


@pytest.mark.parametrize(
"source,target",
(
("bad", "ckan"),
("ckan", "bad"),
),
)
def test_convert_metadata_bad_source_or_target_dp_012(source: str, target: str):
with pytest.raises(ValueError):
convert_metadata("resource.json", type="resource", source=source, target=target) # type: ignore
16 changes: 13 additions & 3 deletions dplib/actions/metadata/convert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from importlib import import_module
from typing import Literal, Optional, Type, Union, cast
from typing import List, Literal, Optional, Type, cast, get_args

from ...models import Package, Resource
from ...system import Model
Expand All @@ -17,9 +17,17 @@ def convert_metadata(
) -> Model:
"""Convert metadata from one notation to another."""

# Validate source/target
if source and source not in NOTATIONS:
raise ValueError(f"Unknown source notation: {source}")
if target and target not in NOTATIONS:
raise ValueError(f"Unknown target notation: {target}")

# Source model
Source = Resource if type == "resource" else Package
if source:
if source not in NOTATIONS:
raise ValueError(f"Unknown source notation: {source}")
module = import_module(f"dplib.plugins.{source}.models")
Source: Type[Model] = getattr(module, f"{source.capitalize()}{type.capitalize()}")
model = Source.from_path(path, format=format)
Expand All @@ -35,5 +43,7 @@ def convert_metadata(
return model


IType = Union[Literal["package"], Literal["resource"]]
INotation = Union[Literal["ckan"], Literal["dcat"], Literal["github"], Literal["zenodo"]]
IType = Literal["package", "resource"]
INotation = Literal["ckan", "dcat", "github", "zenodo"]

NOTATIONS: List[INotation] = list(get_args(INotation))
24 changes: 24 additions & 0 deletions dplib/helpers/__spec__/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import platform
import stat
from pathlib import Path

import pytest

from dplib.helpers.file import write_file


@pytest.mark.skipif(platform.system() == "Windows", reason="Unix only")
def test_write_file_implicit_permissions_dp_002(tmpdir: Path):
path = str(tmpdir / "test.txt")
write_file(path, "Hello, World!")
permissions = oct(stat.S_IMODE(os.stat(path).st_mode))
assert permissions == "0o600"


@pytest.mark.skipif(platform.system() == "Windows", reason="Unix only")
def test_write_file_explicit_permissions_dp_003(tmpdir: Path):
path = str(tmpdir / "test.txt")
write_file(path, "Hello, World!", permissions=0o644)
permissions = oct(stat.S_IMODE(os.stat(path).st_mode))
assert permissions == "0o644"
16 changes: 16 additions & 0 deletions dplib/helpers/__spec__/test_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest

from dplib.helpers.path import assert_safe_path


@pytest.mark.parametrize(
"path",
(
"../data.csv",
"/etc/home/secret.json",
"http:test/../../secret.json",
),
)
def test_assert_safe_path_raises_dp_001(path: str):
with pytest.raises(Exception):
assert_safe_path(path)
16 changes: 11 additions & 5 deletions dplib/helpers/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,28 @@ def read_file(
raise Error(f'Cannot read file "{path}"')


def write_file(path: str, body: Any, *, mode: str = "wt", encoding: str = "utf-8"):
def write_file(
path: str,
body: Any,
*,
mode: str = "wt",
encoding: str = "utf-8",
permissions: int = 0o600,
):
try:
eff_enc = encoding if mode == "wt" else None
with tempfile.NamedTemporaryFile(mode, delete=False, encoding=eff_enc) as file:
file.write(body)
file.flush()
move_file(file.name, path, mode=0o644)
move_file(file.name, path, permissions=permissions)
except Exception:
raise Error(f'Cannot write file "{path}"')


def move_file(source: str, target: str, *, mode: Optional[int] = None):
def move_file(source: str, target: str, *, permissions: int = 0o600):
try:
Path(target).parent.mkdir(parents=True, exist_ok=True)
shutil.move(source, target)
if mode:
os.chmod(target, 0o644)
os.chmod(target, permissions)
except Exception:
raise Error(f'Cannot move file "{source}:{target}"')
22 changes: 14 additions & 8 deletions dplib/helpers/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import os
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import urlparse

import fsspec # type: ignore

from ..error import Error

Expand All @@ -21,7 +22,7 @@ def infer_format(path: str, *, raise_missing: bool = False):

def infer_basepath(path: str):
basepath = os.path.dirname(path)
if basepath and not is_url_path(basepath):
if basepath and is_file_protocol_path(basepath):
if not os.path.abspath(basepath):
basepath = os.path.relpath(basepath, start=os.getcwd())
return basepath
Expand All @@ -38,21 +39,26 @@ def ensure_basepath(path: str, basepath: Optional[str] = None) -> Tuple[str, str
def join_basepath(path: str, basepath: Optional[str] = None) -> str:
if not basepath:
return path
if is_url_path(path):
if not is_file_protocol_path(path):
return path
if is_url_path(basepath):
if not is_file_protocol_path(basepath):
return f"{basepath}/{path}"
return os.path.join(basepath, path)


def is_url_path(path: str) -> bool:
scheme = urlparse(path).scheme
return scheme in ["http", "https"]
def is_file_protocol_path(path: str) -> bool:
info = fsspec.utils.infer_storage_options(path) # type: ignore
return info.get("protocol") == "file" # type: ignore


def is_http_or_ftp_protocol_path(path: str) -> bool:
info = fsspec.utils.infer_storage_options(path) # type: ignore
return info.get("protocol") in ["http", "https", "ftp", "ftps"] # type: ignore


def assert_safe_path(path: str, *, basepath: Optional[str] = None):
"""Assert that the path (untrusted) is not outside the basepath (trusted)"""
if not is_url_path(path):
if is_file_protocol_path(path):
try:
root = Path(basepath or os.getcwd()).resolve()
item = root.joinpath(path).resolve()
Expand Down
4 changes: 2 additions & 2 deletions dplib/helpers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..errors.metadata import MetadataError
from .dict import load_dict
from .file import read_file
from .path import is_url_path
from .path import is_http_or_ftp_protocol_path

# TODO: implement additional user-side profile caching

Expand Down Expand Up @@ -40,7 +40,7 @@ def read_profile(*, profile: str) -> types.IDict:
profile = os.path.join(settings.PROFILE_BASEDIR, version, filename)

# Ensure profile is URL
if not is_url_path(profile):
if not is_http_or_ftp_protocol_path(profile):
raise Error(f'Profile MUST be a URL: "{profile}"')

# Read jsonSchema
Expand Down
Loading