From 4ba65ece33c78e9a13caa1adfad0bc354529dbd9 Mon Sep 17 00:00:00 2001 From: Jose Velazquez Date: Wed, 16 Mar 2022 16:45:39 -0600 Subject: [PATCH] Adding Save As pcapng for ProtocolAnalyzer --- src/urh/controller/CompareFrameController.py | 7 +- src/urh/dev/PCAPNG.py | 116 +++++++++++++++++++ src/urh/signalprocessing/Message.py | 4 + src/urh/signalprocessing/ProtocolAnalyzer.py | 7 ++ src/urh/util/FileOperator.py | 3 +- tests/test_util.py | 37 ++++++ 6 files changed, 172 insertions(+), 2 deletions(-) create mode 100644 src/urh/dev/PCAPNG.py diff --git a/src/urh/controller/CompareFrameController.py b/src/urh/controller/CompareFrameController.py index 32777d3182..54c85acb5c 100644 --- a/src/urh/controller/CompareFrameController.py +++ b/src/urh/controller/CompareFrameController.py @@ -8,7 +8,7 @@ from PyQt5.QtCore import pyqtSlot, QTimer, Qt, pyqtSignal, QItemSelection, QItemSelectionModel, QLocale, \ QModelIndex from PyQt5.QtGui import QContextMenuEvent, QIcon -from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView +from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView, QInputDialog from urh import settings from urh.awre import AutoAssigner @@ -864,6 +864,11 @@ def save_protocol(self): if filename.endswith(".bin"): self.proto_analyzer.to_binary(filename, use_decoded=True) + elif filename.endswith(".pcapng"): + data_link_type, ok = QInputDialog.getInt(self, "Link type", + "Interface Link Type to use (probably one between DLT_USER0-DLT_USER15 (147-162)):", 147, 0, 65535) + if ok: + self.proto_analyzer.to_pcapng(filename=filename, link_type=data_link_type) else: self.proto_analyzer.to_xml_file(filename=filename, decoders=self.decodings, participants=self.project_manager.participants, write_bits=True) diff --git a/src/urh/dev/PCAPNG.py b/src/urh/dev/PCAPNG.py new file mode 100644 index 0000000000..6ae268717e --- /dev/null +++ b/src/urh/dev/PCAPNG.py @@ -0,0 +1,116 @@ +import os +import struct +import time +import math + +from urh.util.Logger import logger + +from urh.signalprocessing.Message import Message + +# Refer to PCAPNG spec +# https://www.ietf.org/staging/draft-tuexen-opsawg-pcapng-02.html + +class PCAPNG(object): + def __init__(self): + self.__messages = [] + self.__messages_timestamps = [] + self.__filename = "" + + def __build_pcapng_shb(self, shb_userappl : str = "", shb_hardware : str = "") -> bytes: + BLOCKTYPE = 0x0A0D0D0A + HEADERS_BLOCK_LENGTH = 28 + MAGIC_NUMBER = 0x1A2B3C4D + VERSION_MAJOR, VERSION_MINOR = 1,0 + SECTIONLENGTH = 0xFFFFFFFFFFFFFFFF # -1 => Not specified + + shb_userappl_padded_len = math.ceil(len(shb_userappl)/4) * 4 + shb_hardware_padded_len = math.ceil(len(shb_hardware)/4) * 4 + + total_block_len = HEADERS_BLOCK_LENGTH + if shb_userappl_padded_len > 0: + total_block_len += shb_userappl_padded_len + 4 + + if shb_hardware_padded_len > 0: + total_block_len += shb_hardware_padded_len + 4 + + shb = struct.pack(">IIIHHQ", + BLOCKTYPE, + total_block_len, + MAGIC_NUMBER, + VERSION_MAJOR, VERSION_MINOR, + SECTIONLENGTH) + + if shb_userappl != "": + SHB_USERAPPL = 4 + strpad = shb_userappl.ljust(shb_userappl_padded_len, "\0") + shb += struct.pack(">HH", SHB_USERAPPL, shb_userappl_padded_len) + shb += bytes(strpad, 'ascii') + + if shb_hardware != "": + SHB_HARDWARE = 2 + strpad = shb_hardware.ljust(shb_hardware_padded_len, "\0") + shb += struct.pack(">HH", SHB_HARDWARE, shb_hardware_padded_len) + shb += bytes(strpad, 'ascii') + + shb += struct.pack(">I",total_block_len) + return shb + + def __build_pcapng_idb(self, link_type) -> bytes: + BLOCKTYPE = 0x00000001 + BLOCKLENGTH = 20 + SNAP_LEN = 0 + + return struct.pack(">IIHHII", + BLOCKTYPE, + BLOCKLENGTH, + link_type, 0, + SNAP_LEN, + BLOCKLENGTH) + + def __build_pcapng_epb(self, packet : bytes, timestamp : float) -> bytes: + BLOCKTYPE = 0x00000006 + BLOCKHEADERLEN = 32 + INTERFACE_ID = 0 + + captured_packet_len = len(packet) + original_packet_len = captured_packet_len + padded_packet_len = math.ceil(captured_packet_len/4) * 4 + padding_len = padded_packet_len - original_packet_len + padded_packet = packet + bytearray(padding_len) + block_total_length = BLOCKHEADERLEN + padded_packet_len + timestamp_int = int(timestamp *1e6) # Set the proper resolution + timestamp_high = timestamp_int >> 32 + timestamp_low = timestamp_int & 0x00000000FFFFFFFF + + epb = struct.pack(">IIIIIII", + BLOCKTYPE, + block_total_length, + INTERFACE_ID, + timestamp_high, + timestamp_low, + captured_packet_len, + original_packet_len) + epb += padded_packet + epb += struct.pack(">I", block_total_length) + return epb + + ############################################################################################################ + # PUBLIC INTERFACES + def create_pcapng_file(self, filename : str, shb_userappl : str = "", shb_hardware : str = "", link_type : int =147) -> bytes: + if (filename == ""): + return + self.__filename = filename + + shb_bytes = self.__build_pcapng_shb(shb_userappl,shb_hardware) + idb_bytes = self.__build_pcapng_idb(link_type) + + if os.path.isfile(self.__filename): + logger.warning("{0} already exists. Overwriting it".format(filename)) + + with open(self.__filename, "wb") as f: + f.write(shb_bytes) + f.write(idb_bytes) + + def pcapng_file_append_packet(self, packet : bytes, timestamp: float): + with open(self.__filename, "ab") as f: + f.write(self.__build_pcapng_epb(packet, timestamp)) diff --git a/src/urh/signalprocessing/Message.py b/src/urh/signalprocessing/Message.py index b84df419cc..3acd194a0f 100644 --- a/src/urh/signalprocessing/Message.py +++ b/src/urh/signalprocessing/Message.py @@ -302,6 +302,10 @@ def decoded_ascii_array(self) -> array.array: def decoded_ascii_str(self) -> str: return "".join(map(chr, self.decoded_ascii_array)) + @property + def decoded_ascii_buffer(self) -> bytes: + return self.decoded_ascii_array.tobytes() + def __get_bit_range_from_hex_or_ascii_index(self, from_index: int, decoded: bool, is_hex: bool) -> tuple: bits = self.decoded_bits if decoded else self.plain_bits factor = 4 if is_hex else 8 diff --git a/src/urh/signalprocessing/ProtocolAnalyzer.py b/src/urh/signalprocessing/ProtocolAnalyzer.py index 238eaa226e..ffafda4050 100644 --- a/src/urh/signalprocessing/ProtocolAnalyzer.py +++ b/src/urh/signalprocessing/ProtocolAnalyzer.py @@ -8,6 +8,7 @@ from urh import settings from urh.cythonext import signal_functions +from urh.dev.PCAPNG import PCAPNG from urh.signalprocessing.Encoding import Encoding from urh.signalprocessing.Message import Message from urh.signalprocessing.MessageType import MessageType @@ -645,6 +646,12 @@ def from_xml_file(self, filename: str, read_bits=False): root = tree.getroot() self.from_xml_tag(root, read_bits=read_bits) + def to_pcapng(self, filename : str, hardware_desc_name: str = "", link_type: int = 147): + pcapng = PCAPNG() + pcapng.create_pcapng_file(filename=filename, shb_userappl="Universal Radio Hacker", shb_hardware=hardware_desc_name, link_type=link_type) + for msg in self.messages: + pcapng.pcapng_file_append_packet(msg.decoded_ascii_buffer, msg.timestamp) + def eliminate(self): self.message_types = None self.messages = None diff --git a/src/urh/util/FileOperator.py b/src/urh/util/FileOperator.py index e91b0b6f6b..19c2fda97c 100644 --- a/src/urh/util/FileOperator.py +++ b/src/urh/util/FileOperator.py @@ -42,6 +42,7 @@ WAV_FILE_FILTER = "Waveform Audio File Format (*.wav *.wave)" PROTOCOL_FILE_FILTER = "Protocol (*.proto.xml *.proto)" BINARY_PROTOCOL_FILE_FILTER = "Binary Protocol (*.bin)" +WIRESHARK_FILE_FILTER = "Wireshark File (*.pcapng)" PLAIN_BITS_FILE_FILTER = "Plain Bits (*.txt)" FUZZING_FILE_FILTER = "Fuzzing Profile (*.fuzz.xml *.fuzz)" SIMULATOR_FILE_FILTER = "Simulator Profile (*.sim.xml *.sim)" @@ -93,7 +94,7 @@ def ask_save_file_name(initial_name: str, caption="Save signal", selected_name_f elif caption == "Export spectrogram": name_filter = "Frequency Time (*.ft);;Frequency Time Amplitude (*.fta)" elif caption == "Save protocol": - name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER]) + name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER, WIRESHARK_FILE_FILTER]) elif caption == "Export demodulated": name_filter = WAV_FILE_FILTER else: diff --git a/tests/test_util.py b/tests/test_util.py index e5adaa40e2..b091250718 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -9,6 +9,7 @@ from tests.utils_testing import get_path_for_data_file from urh import settings from urh.dev.PCAP import PCAP +from urh.dev.PCAPNG import PCAPNG from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer from urh.signalprocessing.Signal import Signal from urh.util import util @@ -71,6 +72,42 @@ def test_write_pcap(self): pcap = PCAP() pcap.write_packets(proto_analyzer.messages, os.path.join(tempfile.gettempdir(), "test.pcap"), 1e6) + def test_write_pcapng(self): + signal = Signal(get_path_for_data_file("ask.complex"), "ASK-Test") + signal.modulation_type = "ASK" + signal.samples_per_symbol = 295 + signal.center = -0.1667 + self.assertEqual(signal.num_samples, 13710) + + proto_analyzer = ProtocolAnalyzer(signal) + proto_analyzer.get_protocol_from_signal() + self.assertEqual(proto_analyzer.decoded_hex_str[0], "b25b6db6c80") + + proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0])) + proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0])) + proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0])) + + pcap = PCAPNG() + pcap.create_pcapng_file(os.path.join(tempfile.gettempdir(), "test.pcapng"), "Universal Radio Hacker Test", "TestHW", 147) + for msg in proto_analyzer.messages: + pcap.pcapng_file_append_packet(msg.decoded_ascii_buffer, 1e6) + + # As we don't have PCAPNG importers, we'll verify output just by checking file size, PCAPNG SHB type number + # and that all msg bytes were written somewhere inside output file + filepath = os.path.join(tempfile.gettempdir(), "test.pcapng") + filechecks = False + if os.path.isfile(filepath): # ok, file exist + with open(filepath, "rb") as f: + filecontents = f.read() + # min file len= SHB + IDB + 4 EPB msgs + minfilelen = 28 + 20 + (4 * (32 + len(proto_analyzer.messages[0].decoded_ascii_buffer))) + if len(filecontents) >= minfilelen: # ok, min file length passed + if filecontents.find(b'\x0A\x0D\x0D\x0A') >= 0: # ok, seems that SHB was written + if filecontents.find(msg.decoded_ascii_buffer) >= 0: # ok, msg bytes written + filechecks = True + + self.assertTrue(filechecks) + def test_de_bruijn_fuzzing(self): self.assertEqual(c_util.de_bruijn(3), array.array("B", [0, 0, 0, 1, 0, 1, 1, 1])) self.assertEqual(c_util.de_bruijn(4), array.array("B", [0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1]))