Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SslKeyLogFile aux module and PcapNg processing module #2312

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions analyzer/windows/modules/auxiliary/sslkeylogfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
import os
import tempfile

from lib.common.abstracts import Auxiliary
from lib.common.results import upload_to_host

log = logging.getLogger(__name__)

SSLKEYLOGFILE = "SSLKEYLOGFILE"

class SslKeyLogFile(Auxiliary):
"""Collect SSLKEYLOGFILE logs from guests."""

def __init__(self, options, config):
Auxiliary.__init__(self, options, config)
self.enabled = config.sslkeylogfile
if self.enabled:
self.upload_prefix = "aux/sslkeylogfile"
self.upload_file = "sslkeys.log"
self.log_path = ""

def upload_sslkeylogfile(self):
"""Upload SSLKEYLOGFILE log to the host if present."""
try:
if self.log_path and os.path.isfile(self.log_path):
log.debug('Attemping to upload SSLKEYLOGFILE from "%s"', self.log_path)
upload_to_host(self.log_path, f"{self.upload_prefix}/{self.upload_file}")
log.debug('SSLKEYLOGFILE uploaded')
except Exception:
log.exception("SslKeyLogFile encountered an exception while uploading '%s'", self.log_path)
raise

def start(self):
if not self.enabled:
log.debug("SslKeyLogFile auxiliary module not enabled")
return
log.info("SslKeyLogFile auxiliary module enabled")
with tempfile.NamedTemporaryFile("w+", encoding="utf-8", delete=False) as keylog:
# Set SSLKEYLOGFILE system environment variable
log.info("Setting %s to %s", SSLKEYLOGFILE, keylog.name)
# Set system env
xcode = os.system("Setx {0} {1} /m".format(SSLKEYLOGFILE, keylog.name))
# Update local process env
os.environ[SSLKEYLOGFILE] = keylog.name

if xcode != 0:
log.info("Failed to set %s", SSLKEYLOGFILE)

self.log_path = keylog.name

def finish(self):
if self.enabled:
self.upload_sslkeylogfile()
1 change: 1 addition & 0 deletions conf/default/auxiliary.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ filecollector = yes
# This is only useful in case you use KVM's dnsmasq. You need to change your range inside of analyzer/windows/modules/auxiliary/disguise.py. Disguise must be enabled
windows_static_route = no
tracee_linux = no
sslkeylogfile = no

[AzSniffer]
# Enable or disable the use of Azure Network Watcher packet capture feature, disable standard sniffer if this is in use to not create concurrent .pcap files
Expand Down
3 changes: 3 additions & 0 deletions conf/default/processing.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ country_lookup = no
# For ipinfo use: Free IP to Country + IP to ASN
maxmind_database = data/GeoLite2-Country.mmdb

[pcapng]
enabled = no

[url_analysis]
enabled = yes
# Enable a WHOIS lookup for the target domain of a URL analyses
Expand Down
85 changes: 85 additions & 0 deletions modules/processing/pcapng.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
import os
import shutil
import subprocess
import tempfile

from lib.cuckoo.common.abstracts import Processing
from lib.cuckoo.common.objects import File
from lib.cuckoo.common.path_utils import path_exists
from utils.tls import tlslog_to_sslkeylogfile

EDITCAP = "editcap"
EDITCAP_TIMEOUT = 60

log = logging.getLogger(__name__)


class PcapNg(Processing):
"""Injects TLS keys into a .pcap, resulting in a .pcapng file.

Requires the `editcap` executable."""

key = "pcapng"

def set_path(self, analysis_path):
"""Set paths.
@param analysis_path: analysis folder path.
"""
super().set_path(analysis_path)
# The file CAPE Monitor logs TLS keys to
self.tlsdump_log = os.path.join(self.analysis_path, "tlsdump", "tlsdump.log")
# The file logged to by libraries that support the SSLKEYLOGFILE env var
self.sslkeys_log = os.path.join(self.analysis_path, "aux/sslkeylogfile", "sslkeys.log")
self.pcapng_path = self.pcap_path + "ng"

def run(self):
retval = {}

if not path_exists(self.pcap_path):
log.debug('pcap not found, nothing to do "%s"', self.pcap_path)
return retval

if os.path.getsize(self.pcap_path) == 0:
log.debug('pcap is empty, nothing to do "%s"', self.pcap_path)
return retval

if not shutil.which(EDITCAP):
log.error("%s not in path and is required", EDITCAP)
return retval

try:
failmsg = "failed to generate .pcapng"
tls_dir = os.path.dirname(self.tlsdump_log)
# Combine all TLS logs into a single file in a format that can be read by editcap
with tempfile.NamedTemporaryFile("w", dir=tls_dir, encoding="utf-8") as dest_ssl_key_log:
# Write CAPEMON keys
if self.file_exists_not_empty(self.tlsdump_log):
log.debug("writing tlsdump.log to temp key log file")
tlslog_to_sslkeylogfile(self.tlsdump_log, dest_ssl_key_log.name)
# Write SSLKEYLOGFILE keys
if self.file_exists_not_empty(self.sslkeys_log):
log.debug("writing SSLKEYLOGFILE to temp key log file")
self.append_file_contents_to_file(self.sslkeys_log, dest_ssl_key_log.name)
self.generate_pcapng(dest_ssl_key_log.name)
retval = {"sha256": File(self.pcapng_path).get_sha256()}
except subprocess.CalledProcessError as exc:
log.error("%s: editcap exited with code: %d", failmsg, exc.returncode)
except subprocess.TimeoutExpired:
log.error("%s: editcap reached timeout", failmsg)
except OSError as exc:
log.error("%s: %s", failmsg, exc)

return retval

def file_exists_not_empty(self, path):
return bool(path_exists(path) and os.path.getsize(path) > 0)

def append_file_contents_to_file(self, file_with_contents, append_to_file):
with open(file_with_contents, "r") as src, open(append_to_file, "a+") as dst:
dst.write(src.read())

def generate_pcapng(self, sslkeylogfile_path):
cmd = [EDITCAP, "--inject-secrets", "tls," + sslkeylogfile_path, self.pcap_path, self.pcapng_path]
log.debug("generating pcapng with command '%s", cmd)
subprocess.check_call(cmd, timeout=EDITCAP_TIMEOUT)
2 changes: 2 additions & 0 deletions tests/test_analysis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def test_build_options(
"recentfiles": False,
"screenshots_linux": True,
"screenshots_windows": True,
"sslkeylogfile": False,
"sysmon_linux": False,
"sysmon_windows": False,
"target": str(tmp_path / "sample.py"),
Expand Down Expand Up @@ -397,6 +398,7 @@ def test_build_options_pe(
"recentfiles": False,
"screenshots_linux": True,
"screenshots_windows": True,
"sslkeylogfile": False,
"sysmon_linux": False,
"sysmon_windows": False,
"target": str(sample_location),
Expand Down
36 changes: 36 additions & 0 deletions tests/test_tls_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.

from utils.tls import TLS12KeyLog, tlslog_to_sslkeylogfile
import unittest.mock as mock

TLSDUMP_LOGS = ["client_random: 66d85f5e14959be90acdf46867ac60b1380a7c0fd4b0e8f18f66438d7840f0bb, server_random: 66d85f6626997dd836d354b522bab8426a6b9ee3daac68467d9d8f1fcdea07c0, master_secret: 70384eb96e90d023e3cc117d9f0f5b703b5cbb88897783e08286656aa40444ab1433c850bf556737d2c09b2d4c67094d"]
SSLKEYLOGS = ["CLIENT_RANDOM 66d85f5e14959be90acdf46867ac60b1380a7c0fd4b0e8f18f66438d7840f0bb 70384eb96e90d023e3cc117d9f0f5b703b5cbb88897783e08286656aa40444ab1433c850bf556737d2c09b2d4c67094d"]

class TestTlsUtils:

def test_tlslog_to_sslkeylogfile(self, tmpdir):
input_log = f"{tmpdir}/tlsdump.log"
dest_log = f"{tmpdir}/sslkeys.log"
with open(input_log, "w+") as tlsdump_log:
tlsdump_log.writelines(TLSDUMP_LOGS)
tlslog_to_sslkeylogfile(input_log, dest_log)
with open(dest_log, "r") as sslkeylogfile:
actual = sslkeylogfile.read().strip()
assert actual == SSLKEYLOGS[0]

@mock.patch("builtins.open")
def test_tlslog_to_sslkeylogfile_path_not_exist(self, mock_open, tmpdir):
mock_open.side_effect = mock.mock_open
input_log = f"{tmpdir}/tlsdump.log"
dest_log = f"{tmpdir}/sslkeys.log"
tlslog_to_sslkeylogfile(input_log, dest_log)
mock_open.assert_not_called()

def test_tls12keylog_from_cape_log(self):
actual = TLS12KeyLog.from_cape_log(TLSDUMP_LOGS[0])
assert actual.client_random == "66d85f5e14959be90acdf46867ac60b1380a7c0fd4b0e8f18f66438d7840f0bb"
assert actual.server_random == "66d85f6626997dd836d354b522bab8426a6b9ee3daac68467d9d8f1fcdea07c0"
assert actual.master_secret == "70384eb96e90d023e3cc117d9f0f5b703b5cbb88897783e08286656aa40444ab1433c850bf556737d2c09b2d4c67094d"
assert str(actual) == SSLKEYLOGS[0]
64 changes: 64 additions & 0 deletions utils/tls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os

from dataclasses import dataclass
from typing import ClassVar

try:
import re2 as re
except ImportError:
import re

@dataclass()
class TLS12KeyLog:
"""TLS 1.2 key log."""

# The type of secret that is being conveyed. TLS 1.2 and earlier
# uses the label "CLIENT_RANDOM" to identify the "master" secret
# for the connection.
LOG_LABEL: ClassVar[str] = "CLIENT_RANDOM"
# The 32-byte value of the Random field from the
# 'client hello' message sent during the TLS handshake.
client_random: str
# The 32-byte value of the Random field from the
# 'server hello' message sent during the TLS handshake.
server_random: str
# The value of the identified secret for the identified
# connection.
master_secret: str

@classmethod
def from_cape_log(cls, log):
TLS12_PATTERN = r"client_random:\s*(?P<client_random>[a-f0-9]+)\s*,\s*server_random:\s*(?P<server_random>[a-f0-9]+)\s*,\s*master_secret:\s*(?P<master_secret>[a-f0-9]+)\s*"
retval = None
match = re.match(TLS12_PATTERN, log)
params = match.groupdict() if match else {}
if len(params) == 3:
retval = cls(**params)
return retval

def __str__(self):
"""Return a string that adheres to the SSLKEYLOGFILE standard
for TLS 1.2 (and earlier).
"""
return f"{self.LOG_LABEL} {self.client_random} {self.master_secret}"

def tlslog_to_sslkeylogfile(tls_log_path, sslkeylogfile_path):
"""Convert Cape's TLS log file (tlsdump.log) into a format that is
readable by WireShark (SSLKEYLOGFILE).

The SSLKEYLOGFILE format is defined by the IETF TLS working group.
The draft standard is published here:
https://datatracker.ietf.org/doc/draft-ietf-tls-keylogfile/
"""
if not os.path.exists(tls_log_path):
return

# SSLKEYLOGFILE should be encoded using utf-8, even though the
# content only includes ASCII characters. Though Unicode is
# permitted in comments, the file MUST NOT contain a Unicode byte
# order mark.
with open(tls_log_path, "r") as ifile, open(sslkeylogfile_path, "w+", encoding="utf-8") as ofile:
for line in ifile:
tlslog = TLS12KeyLog.from_cape_log(line)
if tlslog:
ofile.write(f"{tlslog}\n")
7 changes: 6 additions & 1 deletion web/analysis/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ def load_files(request, task_id, category):
]
elif category == "network":
data = mongo_find_one(
"analysis", {"info.id": int(task_id)}, {category: 1, "info.tlp": 1, "cif": 1, "suricata": 1, "_id": 0}
"analysis", {"info.id": int(task_id)}, {category: 1, "info.tlp": 1, "cif": 1, "suricata": 1, "pcapng": 1, "_id": 0}
)
else:
data = mongo_find_one("analysis", {"info.id": int(task_id)}, {category: 1, "info.tlp": 1, "_id": 0})
Expand Down Expand Up @@ -749,6 +749,7 @@ def load_files(request, task_id, category):
ajax_response["domainlookups"] = {i["domain"]: i["ip"] for i in ajax_response.get("network", {}).get("domains", {})}
ajax_response["suricata"] = data.get("suricata", {})
ajax_response["cif"] = data.get("cif", [])
ajax_response["pcapng"] = data.get("pcapng", {})
tls_path = os.path.join(ANALYSIS_BASE_PATH, "analyses", str(task_id), "tlsdump", "tlsdump.log")
if _path_safe(tls_path):
ajax_response["tlskeys_exists"] = _path_safe(tls_path)
Expand Down Expand Up @@ -1876,6 +1877,10 @@ def file(request, category, task_id, dlfile):
file_name += ".pcap"
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcap")
cd = "application/vnd.tcpdump.pcap"
elif category == "pcapng":
file_name += ".pcapng"
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcapng")
cd = "application/vnd.tcpdump.pcap"
elif category == "debugger_log":
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "debugger", str(dlfile) + ".log")
elif category == "rtf":
Expand Down
3 changes: 3 additions & 0 deletions web/templates/analysis/network/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

<div class="alert alert-primary center">
<a class="btn btn-secondary btn-sm" href="{% url "file" "pcap" id network.pcap_sha256 %}"><span class="fas fa-download"></span> PCAP</a>
{% if pcapng.sha256 %}
<a class="btn btn-secondary btn-sm" title="PCAP with embedded TLS keys for use in WireShark." href="{% url "file" "pcapng" id pcapng.sha256 %}"><span class="fas fa-download"></span> PCAP-NG</a>
{% endif %}
<a class="btn btn-secondary btn-sm" href="{% url "file" "pcapzip" id network.pcap_sha256 %}"><span class="fas fa-file-archive"></span><span class="fas fa-download"></span> PCAP</a>
{% if tlskeys_exists %}
<a class="btn btn-secondary btn-sm" href="{% url "file" "tlskeys" id network.pcap_sha256 %}"><span class="fas fa-download"></span> TLS keys</a>
Expand Down