From ae4682fd92ed5ca394367e0c2fa9e7618df6cf47 Mon Sep 17 00:00:00 2001 From: Jose Velazquez Date: Wed, 16 Mar 2022 16:45:39 -0600 Subject: [PATCH] Adding Save As pcapng for ProtocolAnalyzer --- src/urh/controller/CompareFrameController.py | 7 +- src/urh/dev/PCAPNG.py | 107 +++++++++++++++++++ src/urh/signalprocessing/Message.py | 4 + src/urh/signalprocessing/ProtocolAnalyzer.py | 8 ++ src/urh/util/FileOperator.py | 3 +- tests/test_util.py | 38 +++++++ 6 files changed, 165 insertions(+), 2 deletions(-) create mode 100644 src/urh/dev/PCAPNG.py diff --git a/src/urh/controller/CompareFrameController.py b/src/urh/controller/CompareFrameController.py index 32777d3182..54c85acb5c 100644 --- a/src/urh/controller/CompareFrameController.py +++ b/src/urh/controller/CompareFrameController.py @@ -8,7 +8,7 @@ from PyQt5.QtCore import pyqtSlot, QTimer, Qt, pyqtSignal, QItemSelection, QItemSelectionModel, QLocale, \ QModelIndex from PyQt5.QtGui import QContextMenuEvent, QIcon -from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView +from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView, QInputDialog from urh import settings from urh.awre import AutoAssigner @@ -864,6 +864,11 @@ def save_protocol(self): if filename.endswith(".bin"): self.proto_analyzer.to_binary(filename, use_decoded=True) + elif filename.endswith(".pcapng"): + data_link_type, ok = QInputDialog.getInt(self, "Link type", + "Interface Link Type to use (probably one between DLT_USER0-DLT_USER15 (147-162)):", 147, 0, 65535) + if ok: + self.proto_analyzer.to_pcapng(filename=filename, link_type=data_link_type) else: self.proto_analyzer.to_xml_file(filename=filename, decoders=self.decodings, participants=self.project_manager.participants, write_bits=True) diff --git a/src/urh/dev/PCAPNG.py b/src/urh/dev/PCAPNG.py new file mode 100644 index 0000000000..5011137780 --- /dev/null +++ b/src/urh/dev/PCAPNG.py @@ -0,0 +1,107 @@ +import os +import struct +import math + +from urh.util.Logger import logger + + +# Refer to PCAPNG spec +# https://www.ietf.org/staging/draft-tuexen-opsawg-pcapng-02.html + +def _build_pcapng_shb(shb_userappl: str = "", shb_hardware: str = "") -> bytes: + BLOCKTYPE = 0x0A0D0D0A + HEADERS_BLOCK_LENGTH = 28 + MAGIC_NUMBER = 0x1A2B3C4D + VERSION_MAJOR, VERSION_MINOR = 1, 0 + SECTIONLENGTH = 0xFFFFFFFFFFFFFFFF # -1 => Not specified + + shb_userappl_padded_len = math.ceil(len(shb_userappl) / 4) * 4 + shb_hardware_padded_len = math.ceil(len(shb_hardware) / 4) * 4 + + total_block_len = HEADERS_BLOCK_LENGTH + if shb_userappl_padded_len > 0: + total_block_len += shb_userappl_padded_len + 4 + + if shb_hardware_padded_len > 0: + total_block_len += shb_hardware_padded_len + 4 + + shb = struct.pack(">IIIHHQ", + BLOCKTYPE, + total_block_len, + MAGIC_NUMBER, + VERSION_MAJOR, VERSION_MINOR, + SECTIONLENGTH) + + if shb_userappl != "": + SHB_USERAPPL = 4 + strpad = shb_userappl.ljust(shb_userappl_padded_len, "\0") + shb += struct.pack(">HH", SHB_USERAPPL, shb_userappl_padded_len) + shb += bytes(strpad, 'ascii') + + if shb_hardware != "": + SHB_HARDWARE = 2 + strpad = shb_hardware.ljust(shb_hardware_padded_len, "\0") + shb += struct.pack(">HH", SHB_HARDWARE, shb_hardware_padded_len) + shb += bytes(strpad, 'ascii') + + shb += struct.pack(">I", total_block_len) + return shb + +def _build_pcapng_idb(link_type) -> bytes: + BLOCKTYPE = 0x00000001 + BLOCKLENGTH = 20 + SNAP_LEN = 0 + + return struct.pack(">IIHHII", + BLOCKTYPE, + BLOCKLENGTH, + link_type, 0, + SNAP_LEN, + BLOCKLENGTH) + +def _build_pcapng_epb(packet: bytes, timestamp: float) -> bytes: + BLOCKTYPE = 0x00000006 + BLOCKHEADERLEN = 32 + INTERFACE_ID = 0 + + captured_packet_len = len(packet) + original_packet_len = captured_packet_len + padded_packet_len = math.ceil(captured_packet_len / 4) * 4 + padding_len = padded_packet_len - original_packet_len + padded_packet = packet + bytearray(padding_len) + block_total_length = BLOCKHEADERLEN + padded_packet_len + timestamp_int = int(timestamp * 1e6) # Set the proper resolution + timestamp_high = timestamp_int >> 32 + timestamp_low = timestamp_int & 0x00000000FFFFFFFF + + epb = struct.pack(">IIIIIII", + BLOCKTYPE, + block_total_length, + INTERFACE_ID, + timestamp_high, + timestamp_low, + captured_packet_len, + original_packet_len) + epb += padded_packet + epb += struct.pack(">I", block_total_length) + return epb + +def create_pcapng_file(filename: str, shb_userappl: str = "", shb_hardware: str = "", + link_type: int = 147) -> bytes: + if filename == "": + return + + shb_bytes = _build_pcapng_shb(shb_userappl, shb_hardware) + idb_bytes = _build_pcapng_idb(link_type) + + if os.path.isfile(filename): + logger.warning("{0} already exists. Overwriting it".format(filename)) + + with open(filename, "wb") as f: + f.write(shb_bytes) + f.write(idb_bytes) + +def append_packets_to_pcapng(filename: str, packets: list, timestamps: list): + with open(filename, "ab") as f: + for packet, timestamp in zip(packets, timestamps): + f.write(_build_pcapng_epb(packet, timestamp)) diff --git a/src/urh/signalprocessing/Message.py b/src/urh/signalprocessing/Message.py index b84df419cc..3acd194a0f 100644 --- a/src/urh/signalprocessing/Message.py +++ b/src/urh/signalprocessing/Message.py @@ -302,6 +302,10 @@ def decoded_ascii_array(self) -> array.array: def decoded_ascii_str(self) -> str: return "".join(map(chr, self.decoded_ascii_array)) + @property + def decoded_ascii_buffer(self) -> bytes: + return self.decoded_ascii_array.tobytes() + def __get_bit_range_from_hex_or_ascii_index(self, from_index: int, decoded: bool, is_hex: bool) -> tuple: bits = self.decoded_bits if decoded else self.plain_bits factor = 4 if is_hex else 8 diff --git a/src/urh/signalprocessing/ProtocolAnalyzer.py b/src/urh/signalprocessing/ProtocolAnalyzer.py index 238eaa226e..dcb1510767 100644 --- a/src/urh/signalprocessing/ProtocolAnalyzer.py +++ b/src/urh/signalprocessing/ProtocolAnalyzer.py @@ -8,6 +8,7 @@ from urh import settings from urh.cythonext import signal_functions +import urh.dev.PCAPNG as PCAPNG from urh.signalprocessing.Encoding import Encoding from urh.signalprocessing.Message import Message from urh.signalprocessing.MessageType import MessageType @@ -645,6 +646,13 @@ def from_xml_file(self, filename: str, read_bits=False): root = tree.getroot() self.from_xml_tag(root, read_bits=read_bits) + def to_pcapng(self, filename : str, hardware_desc_name: str = "", link_type: int = 147): + PCAPNG.create_pcapng_file(filename=filename, shb_userappl="Universal Radio Hacker", shb_hardware=hardware_desc_name, link_type=link_type) + PCAPNG.append_packets_to_pcapng( + filename=filename, + packets=(msg.decoded_ascii_buffer for msg in self.messages), + timestamps=(msg.timestamp for msg in self.messages)) + def eliminate(self): self.message_types = None self.messages = None diff --git a/src/urh/util/FileOperator.py b/src/urh/util/FileOperator.py index e89be4dddb..a3da9d577d 100644 --- a/src/urh/util/FileOperator.py +++ b/src/urh/util/FileOperator.py @@ -42,6 +42,7 @@ WAV_FILE_FILTER = "Waveform Audio File Format (*.wav *.wave)" PROTOCOL_FILE_FILTER = "Protocol (*.proto.xml *.proto)" BINARY_PROTOCOL_FILE_FILTER = "Binary Protocol (*.bin)" +WIRESHARK_FILE_FILTER = "Wireshark File (*.pcapng)" PLAIN_BITS_FILE_FILTER = "Plain Bits (*.txt)" FUZZING_FILE_FILTER = "Fuzzing Profile (*.fuzz.xml *.fuzz)" SIMULATOR_FILE_FILTER = "Simulator Profile (*.sim.xml *.sim)" @@ -93,7 +94,7 @@ def ask_save_file_name(initial_name: str, caption="Save signal", selected_name_f elif caption == "Export spectrogram": name_filter = "Frequency Time (*.ft);;Frequency Time Amplitude (*.fta)" elif caption == "Save protocol": - name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER]) + name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER, WIRESHARK_FILE_FILTER]) elif caption == "Export demodulated": name_filter = ";;".join([WAV_FILE_FILTER, SUB_FILE_FILTER]) else: diff --git a/tests/test_util.py b/tests/test_util.py index e5adaa40e2..bcd2d34ddd 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -9,6 +9,7 @@ from tests.utils_testing import get_path_for_data_file from urh import settings from urh.dev.PCAP import PCAP +import urh.dev.PCAPNG as PCAPNG from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer from urh.signalprocessing.Signal import Signal from urh.util import util @@ -71,6 +72,43 @@ def test_write_pcap(self): pcap = PCAP() pcap.write_packets(proto_analyzer.messages, os.path.join(tempfile.gettempdir(), "test.pcap"), 1e6) + def test_write_pcapng(self): + signal = Signal(get_path_for_data_file("ask.complex"), "ASK-Test") + signal.modulation_type = "ASK" + signal.samples_per_symbol = 295 + signal.center = -0.1667 + self.assertEqual(signal.num_samples, 13710) + + proto_analyzer = ProtocolAnalyzer(signal) + proto_analyzer.get_protocol_from_signal() + self.assertEqual(proto_analyzer.decoded_hex_str[0], "b25b6db6c80") + + proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0])) + proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0])) + proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0])) + + filepath = os.path.join(tempfile.gettempdir(), "test.pcapng") + PCAPNG.create_pcapng_file(filepath, "Universal Radio Hacker Test", "TestHW", 147) + PCAPNG.append_packets_to_pcapng( + filename=filepath, + packets=(msg.decoded_ascii_buffer for msg in proto_analyzer.messages), + timestamps=(msg.timestamp for msg in proto_analyzer.messages)) + + # As we don't have PCAPNG importers, we'll verify output just by checking file size, PCAPNG SHB type number + # and that all msg bytes were written somewhere inside output file + filechecks = False + if os.path.isfile(filepath): # ok, file exist + with open(filepath, "rb") as f: + filecontents = f.read() + # min file len= SHB + IDB + 4 EPB msgs + minfilelen = 28 + 20 + (4 * (32 + len(proto_analyzer.messages[0].decoded_ascii_buffer))) + if len(filecontents) >= minfilelen: # ok, min file length passed + if filecontents.find(b'\x0A\x0D\x0D\x0A') >= 0: # ok, seems that SHB was written + if filecontents.find(proto_analyzer.messages[0].decoded_ascii_buffer) >= 0: # ok, msg bytes written + filechecks = True + + self.assertTrue(filechecks) + def test_de_bruijn_fuzzing(self): self.assertEqual(c_util.de_bruijn(3), array.array("B", [0, 0, 0, 1, 0, 1, 1, 1])) self.assertEqual(c_util.de_bruijn(4), array.array("B", [0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1]))