Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A bunch of fixes for a bunch of tests #1964

Merged
merged 6 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions lib/cuckoo/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ def __init__(self, fname_base: str = "cuckoo"):
self._read_files(files)

def _get_files_to_read(self, fname_base):
files = [
os.path.join(CUCKOO_ROOT, "conf", f"{fname_base}.conf.default"),
os.path.join(CUCKOO_ROOT, "conf", f"{fname_base}.conf"),
os.path.join(CUSTOM_CONF_DIR, f"{fname_base}.conf"),
]
files.extend(sorted(glob.glob(os.path.join(CUCKOO_ROOT, "conf", f"{fname_base}.conf.d", "*.conf"))))
# Allows test workflows to ignore custom root configs
include_root_configs = "CAPE_DISABLE_ROOT_CONFIGS" not in os.environ
files = [os.path.join(CUCKOO_ROOT, "conf", f"{fname_base}.conf.default")]
if include_root_configs:
files.append(os.path.join(CUCKOO_ROOT, "conf", f"{fname_base}.conf"))
files.extend(sorted(glob.glob(os.path.join(CUCKOO_ROOT, "conf", f"{fname_base}.conf.d", "*.conf"))))
files.append(os.path.join(CUSTOM_CONF_DIR, f"{fname_base}.conf"))
files.extend(sorted(glob.glob(os.path.join(CUSTOM_CONF_DIR, f"{fname_base}.conf.d", "*.conf"))))
return files

Expand Down
28 changes: 14 additions & 14 deletions lib/cuckoo/common/integrations/file_extra_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@

DuplicatesType = DefaultDict[str, Set[str]]


cfg = Config()
processing_conf = Config("processing")
selfextract_conf = Config("selfextract")

Expand Down Expand Up @@ -133,7 +133,7 @@

exclude_startswith = ("parti_",)
excluded_extensions = (".parti",)

tools_folder = os.path.join(cfg.cuckoo.get("tmppath", "/tmp"), "cape-external")

def static_file_info(
data_dictionary: dict,
Expand Down Expand Up @@ -515,7 +515,7 @@ def generic_file_extractors(


def _generic_post_extraction_process(file: str, tool_name: str, decoded: str) -> SuccessfulExtractionReturnType:
with extractor_ctx(file, tool_name) as ctx:
with extractor_ctx(file, tool_name, folder=tools_folder) as ctx:
basename = f"{os.path.basename(file)}_decoded"
decoded_file_path = os.path.join(ctx["tempdir"], basename)
_ = path_write_file(decoded_file_path, decoded, mode="text")
Expand Down Expand Up @@ -595,7 +595,7 @@ def eziriz_deobfuscate(file: str, *, data_dictionary: dict, **_) -> ExtractorRet
log.error("You need to add execution permissions: chmod a+x data/NETReactorSlayer.CLI")
return

with extractor_ctx(file, "eziriz", prefix="eziriz_") as ctx:
with extractor_ctx(file, "eziriz", prefix="eziriz_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
dest_path = os.path.join(tempdir, os.path.basename(file))
_ = run_tool(
Expand Down Expand Up @@ -628,7 +628,7 @@ def de4dot_deobfuscate(file: str, *, filetype: str, **_) -> ExtractorReturnType:
log.error("Missed dependency: sudo apt install de4dot")
return

with extractor_ctx(file, "de4dot", prefix="de4dot_") as ctx:
with extractor_ctx(file, "de4dot", prefix="de4dot_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
dest_path = os.path.join(tempdir, os.path.basename(file))
_ = run_tool(
Expand Down Expand Up @@ -657,7 +657,7 @@ def msi_extract(file: str, *, filetype: str, **kwargs) -> ExtractorReturnType:

extracted_files = []
# sudo apt install msitools or 7z
with extractor_ctx(file, "MsiExtract", prefix="msidump_") as ctx:
with extractor_ctx(file, "MsiExtract", prefix="msidump_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
output = False
if not kwargs.get("tests"):
Expand Down Expand Up @@ -705,7 +705,7 @@ def Inno_extract(file: str, *, data_dictionary: dict, **_) -> ExtractorReturnTyp
log.error("Missed dependency: sudo apt install innoextract")
return

with extractor_ctx(file, "InnoExtract", prefix="innoextract_") as ctx:
with extractor_ctx(file, "InnoExtract", prefix="innoextract_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
run_tool(
[selfextract_conf.Inno_extract.binary, file, "--output-dir", tempdir],
Expand All @@ -731,7 +731,7 @@ def kixtart_extract(file: str, **_) -> ExtractorReturnType:
if not data.startswith(b"\x1a\xaf\x06\x00\x00\x10"):
return

with extractor_ctx(file, "Kixtart", prefix="kixtart_") as ctx:
with extractor_ctx(file, "Kixtart", prefix="kixtart_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
kix = Kixtart(file, dump_dir=tempdir)
kix.decrypt()
Expand All @@ -756,7 +756,7 @@ def UnAutoIt_extract(file: str, *, data_dictionary: dict, **_) -> ExtractorRetur
UN_AUTOIT_NOTIF = True
return

with extractor_ctx(file, "UnAutoIt", prefix="unautoit_") as ctx:
with extractor_ctx(file, "UnAutoIt", prefix="unautoit_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
output = run_tool(
[unautoit_binary, "extract-all", "--output-dir", tempdir, file],
Expand All @@ -778,7 +778,7 @@ def UPX_unpack(file: str, *, filetype: str, data_dictionary: dict, **_) -> Extra
):
return

with extractor_ctx(file, "UnUPX", prefix="unupx_") as ctx:
with extractor_ctx(file, "UnUPX", prefix="unupx_", folder=tools_folder) as ctx:
basename = f"{os.path.basename(file)}_unpacked"
dest_path = os.path.join(ctx["tempdir"], basename)
output = run_tool(
Expand Down Expand Up @@ -852,7 +852,7 @@ def SevenZip_unpack(file: str, *, filetype: str, data_dictionary: dict, options:
else:
return

with extractor_ctx(file, tool, prefix=prefix) as ctx:
with extractor_ctx(file, tool, prefix=prefix, folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
HAVE_SFLOCK = False
if HAVE_SFLOCK:
Expand Down Expand Up @@ -892,7 +892,7 @@ def RarSFX_extract(file, *, data_dictionary, options: dict, **_) -> ExtractorRet
log.warning("Missed UnRar binary: /usr/bin/unrar. sudo apt install unrar")
return

with extractor_ctx(file, "UnRarSFX", prefix="unrar_") as ctx:
with extractor_ctx(file, "UnRarSFX", prefix="unrar_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
password = options.get("password", "infected")
output = run_tool(
Expand All @@ -914,7 +914,7 @@ def office_one(file, **_) -> ExtractorReturnType:
):
return

with extractor_ctx(file, "OfficeOne", prefix="office_one") as ctx:
with extractor_ctx(file, "OfficeOne", prefix="office_one", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
try:
document = OneNoteExtractor(path_read_file(file))
Expand All @@ -937,7 +937,7 @@ def msix_extract(file: str, *, data_dictionary: dict, **_) -> ExtractorReturnTyp
):
return

with extractor_ctx(file, "MSIX", prefix="msixdump_") as ctx:
with extractor_ctx(file, "MSIX", prefix="msixdump_", folder=tools_folder) as ctx:
tempdir = ctx["tempdir"]
_ = run_tool(
["unzip", file, "-d", tempdir],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import timeit
from typing import List, Optional, TypedDict

from lib.cuckoo.common.config import Config
from lib.cuckoo.common.path_utils import path_mkdir, path_object

cfg = Config()
log = logging.getLogger(__name__)


Expand Down Expand Up @@ -49,8 +47,7 @@ def wrapped(*args, **kwargs):


@contextlib.contextmanager
def extractor_ctx(filepath, tool_name, prefix=None):
folder = os.path.join(cfg.cuckoo.get("tmppath", "/tmp"), "cape-external")
def extractor_ctx(filepath, tool_name, prefix=None, folder="/tmp/cape-external"):
path_mkdir(folder, exist_ok=True)

tempdir = tempfile.mkdtemp(prefix=prefix, dir=folder)
Expand Down
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
import pathlib
import pytest
from lib.cuckoo.common.config import ConfigMeta

import lib.cuckoo.common.config
import lib.cuckoo.core.database


@pytest.fixture
def tmp_cuckoo_root(monkeypatch, tmp_path):
monkeypatch.setattr(lib.cuckoo.core.database, "CUCKOO_ROOT", str(tmp_path))
yield tmp_path


@pytest.fixture(autouse=True)
def custom_conf_path(request, monkeypatch, tmp_cuckoo_root):
ConfigMeta.reset()
monkeypatch.setenv("CAPE_DISABLE_ROOT_CONFIGS", "1")
path: pathlib.Path = tmp_cuckoo_root / "custom" / "conf"
path.mkdir(mode=0o755, parents=True)
monkeypatch.setattr(lib.cuckoo.common.config, "CUSTOM_CONF_DIR", str(path))
yield path
103 changes: 30 additions & 73 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

import pytest

import lib.cuckoo.common.config
from lib.cuckoo.common.config import AnalysisConfig, Config, ConfigMeta
from lib.cuckoo.common.config import AnalysisConfig, Config
from lib.cuckoo.common.exceptions import CuckooOperationalError
from lib.cuckoo.common.path_utils import path_write_file

Expand All @@ -31,11 +30,6 @@ def analysis_config(tmp_path):
yield AnalysisConfig(path)


@pytest.fixture(autouse=True)
def reset():
ConfigMeta.reset()


class TestAnalysisConfig:
def test_get_option_exist(self, analysis_config):
"""Fetch an option of each type from default config file."""
Expand Down Expand Up @@ -67,68 +61,42 @@ def write_config(path, content):
path_write_file(path, textwrap.dedent(content), mode="text")


@pytest.fixture
def custom_conf_dir(monkeypatch, tmp_path):
custom_dir = tmp_path / "custom"
custom_dir.mkdir()
monkeypatch.setattr(lib.cuckoo.common.config, "CUSTOM_CONF_DIR", str(custom_dir))
yield custom_dir


@pytest.fixture
def default_config(monkeypatch, tmp_path):
default_dir = tmp_path / "default"
default_dir.mkdir()
default_conf = default_dir / "conf" / "cuckoo.conf"
default_conf.parent.mkdir()
write_config(
default_conf,
"""
[cuckoo]
debug = false
analysis_timeout = 120
""",
)
monkeypatch.setattr(lib.cuckoo.common.config, "CUCKOO_ROOT", str(default_dir))
yield default_conf


class TestConfig:
def test_option_override(self, custom_conf_dir, default_config):
def test_option_override(self, custom_conf_path):
"""Fetch an option of each type from default config file."""
custom_conf = custom_conf_dir / "cuckoo.conf"
custom_conf = custom_conf_path / "cuckoo.conf"
write_config(
custom_conf,
"""
[cuckoo]
debug = true
machinery_screenshots = true
""",
)
config = Config("cuckoo")

# This was overridden in the custom config.
assert config.get("cuckoo")["debug"] is True
assert config.get("cuckoo")["machinery_screenshots"] is True
# This was inherited from the default config.
assert config.get("cuckoo")["analysis_timeout"] == 120
assert config.get("cuckoo")["max_analysis_count"] == 0

def test_nonexistent_custom_file(self, custom_conf_dir, default_config):
def test_nonexistent_custom_file(self):
"""Verify that there are no problems when the file to be processed does not
exist in the custom config dir.
"""
config = Config("cuckoo")
assert config.get("cuckoo")["debug"] is False
assert config.get("cuckoo")["analysis_timeout"] == 120
assert config.get("cuckoo")["machinery_screenshots"] is False
assert config.get("cuckoo")["max_analysis_count"] == 0

def test_subdirs(self, custom_conf_dir):
api_conf = custom_conf_dir / "api.conf"
def test_subdirs(self, custom_conf_path):
api_conf = custom_conf_path / "api.conf"
write_config(
api_conf,
"""
[api]
ratelimit = no
""",
)
api_conf_d_dir = custom_conf_dir / "api.conf.d"
api_conf_d_dir = custom_conf_path / "api.conf.d"
api_conf_d_dir.mkdir()
host_specific_conf = api_conf_d_dir / "01_host_specific.conf"
url = "https://somehost.example.com"
Expand All @@ -149,62 +117,51 @@ def test_subdirs(self, custom_conf_dir):
# api.conf.d/01_host_specific.conf.
assert config.get("api")["url"] == url

def test_singleton_configs(self, default_config):
def test_singleton_configs(self):
"""Verify that Config objects that were passed the same "file_name" argument
are reused.
"""
config1 = Config("cuckoo")
config2 = Config("cuckoo")
assert config1 is config2

def test_environment_interpolation(self, default_config, custom_conf_dir, monkeypatch):
def test_environment_interpolation(self, custom_conf_path, monkeypatch):
"""Verify that environment variables are able to be referenced in config
files.
"""
default_dir = default_config.parent
aux_conf = default_dir / "auxiliary.conf"
write_config(
aux_conf,
"""
[virustotaldl]
enabled = no
#dlintelkey = SomeKeyWithDLAccess
dlpath = /tmp/
""",
)
custom_conf = custom_conf_dir / "auxiliary.conf"
custom_conf = custom_conf_path / "processing.conf"
write_config(
custom_conf,
"""
[virustotaldl]
enabled = yes
dlintelkey = %(ENV:DLINTELKEY)s
[virustotal]
on_demand = yes
key = %(ENV:DLINTELKEY)s
""",
)

custom_secret = "MyReallySecretKeyWithAPercent(%)InIt"
monkeypatch.setenv("DLINTELKEY", custom_secret)
config = Config("auxiliary")
section = config.get("virustotaldl")
config = Config("processing")
section = config.get("virustotal")
# Inherited from default config
assert section.dlpath == "/tmp/"
# Overridden from custom config
assert section.enabled is True
# Overridden from custom config
assert section.on_demand is True
# Overridden from custom config and uses environment variable
assert section.dlintelkey == custom_secret
assert section.key == custom_secret

def test_missing_environment_interpolation(self, default_config, custom_conf_dir, monkeypatch):
def test_missing_environment_interpolation(self, custom_conf_path, monkeypatch):
"""Verify that an exception is raised if an ENV variable is to be used in a
config file, but that variable is not present in the environment.
"""
default_dir = default_config.parent
aux_conf = default_dir / "auxiliary.conf"
custom_conf = custom_conf_path / "processing.conf"
write_config(
aux_conf,
custom_conf,
"""
[foo]
bar = %(ENV:IDONTEXIST)s
[virustotal]
key = %(ENV:IDONTEXIST)s
""",
)

with pytest.raises(configparser.InterpolationMissingOptionError):
_ = Config("auxiliary")
_ = Config("processing")
4 changes: 3 additions & 1 deletion tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from lib.cuckoo.common.exceptions import CuckooOperationalError
from lib.cuckoo.common.path_utils import path_mkdir
from lib.cuckoo.common.utils import store_temp_file
from lib.cuckoo.core.database import Database, Machine, Tag, Task, machines_tags, tasks_tags
from lib.cuckoo.core.database import Database, Error, Machine, Tag, Task, machines_tags, tasks_tags


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -57,11 +57,13 @@ def setup_method(self, method):
stmt3 = delete(machines_tags)
stmt4 = delete(tasks_tags)
stmt5 = delete(Tag)
stmt6 = delete(Error)
self.session.execute(stmt)
self.session.execute(stmt2)
self.session.execute(stmt3)
self.session.execute(stmt4)
self.session.execute(stmt5)
self.session.execute(stmt6)
self.session.commit()

def teardown_method(self):
Expand Down
Loading
Loading