diff --git a/peru/resources/plugins/curl/curl_plugin.py b/peru/resources/plugins/curl/curl_plugin.py index 65ea3cd8..668428d2 100755 --- a/peru/resources/plugins/curl/curl_plugin.py +++ b/peru/resources/plugins/curl/curl_plugin.py @@ -126,13 +126,17 @@ def plugin_sync(url, sha1): def extract_tar(archive_path, dest): with tarfile.open(archive_path) as t: - validate_filenames(info.path for info in t.getmembers()) + for info in t.getmembers(): + validate_filename(info.path) + if info.issym(): + validate_symlink(info.path, info.linkname) t.extractall(dest) def extract_zip(archive_path, dest): with zipfile.ZipFile(archive_path) as z: - validate_filenames(z.namelist()) + for name in z.namelist(): + validate_filename(name) z.extractall(dest) # Set file permissions. Tar does this by default, but with zip we need # to do it ourselves. @@ -153,11 +157,30 @@ def extract_zip(archive_path, dest): os.chmod(os.path.join(dest, info.filename), 0o755) -def validate_filenames(names): - for name in names: - path = pathlib.PurePosixPath(name) - if path.is_absolute() or '..' in path.parts: - raise EvilArchiveError('Illegal path in archive: ' + name) +def validate_filename(name): + path = pathlib.PurePosixPath(name) + if path.is_absolute() or ".." in path.parts: + raise EvilArchiveError("Illegal path in archive: " + name) + + +def validate_symlink(name, target): + # We might do this twice but that's fine. + validate_filename(name) + + allowed_parent_parts = len(pathlib.PurePosixPath(name).parts) - 1 + + target_path = pathlib.PurePosixPath(target) + if target_path.is_absolute(): + raise EvilArchiveError("Illegal symlink target in archive: " + target) + leading_parent_parts = 0 + for part in target_path.parts: + if part != "..": + break + leading_parent_parts += 1 + if leading_parent_parts > allowed_parent_parts: + raise EvilArchiveError("Illegal symlink target in archive: " + target) + if ".." in target_path.parts[leading_parent_parts:]: + raise EvilArchiveError("Illegal symlink target in archive: " + target) class EvilArchiveError(RuntimeError): diff --git a/tests/resources/illegal_symlink_absolute.tar b/tests/resources/illegal_symlink_absolute.tar new file mode 100644 index 00000000..24416957 Binary files /dev/null and b/tests/resources/illegal_symlink_absolute.tar differ diff --git a/tests/resources/illegal_symlink_dots.tar b/tests/resources/illegal_symlink_dots.tar new file mode 100644 index 00000000..679545b9 Binary files /dev/null and b/tests/resources/illegal_symlink_dots.tar differ diff --git a/tests/resources/legal_symlink_dots.tar b/tests/resources/legal_symlink_dots.tar new file mode 100644 index 00000000..7b881393 Binary files /dev/null and b/tests/resources/legal_symlink_dots.tar differ diff --git a/tests/test_curl_plugin.py b/tests/test_curl_plugin.py index 8ab476fb..bad894f3 100644 --- a/tests/test_curl_plugin.py +++ b/tests/test_curl_plugin.py @@ -104,6 +104,21 @@ def test_evil_archives(self): with self.assertRaises(curl_plugin.EvilArchiveError): curl_plugin.extract_tar(str(tar_archive), dest) + def test_evil_symlink_archives(self): + """Even worse than archives containing bad paths, an archive could + contain a *symlink* pointing to a bad path. Then a subsequent entry in + the *same* archive could write through the symlink.""" + dest = shared.create_dir() + for case in ["illegal_symlink_dots", "illegal_symlink_absolute"]: + tar_archive = shared.test_resources / (case + ".tar") + with self.assertRaises(curl_plugin.EvilArchiveError): + curl_plugin.extract_tar(str(tar_archive), dest) + # But leading dots should be allowed in symlinks, as long as they don't + # escape the root of the archive. + for case in ["legal_symlink_dots"]: + tar_archive = shared.test_resources / (case + ".tar") + curl_plugin.extract_tar(str(tar_archive), dest) + def test_request_has_user_agent_header(self): actual = curl_plugin.build_request("http://example.test") self.assertTrue(actual.has_header("User-agent"))