Skip to content

Commit

Permalink
Adding Save As pcapng for ProtocolAnalyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
jpacov authored and PepsConti committed Oct 14, 2022
1 parent a4f8db4 commit ae4682f
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 2 deletions.
7 changes: 6 additions & 1 deletion src/urh/controller/CompareFrameController.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from PyQt5.QtCore import pyqtSlot, QTimer, Qt, pyqtSignal, QItemSelection, QItemSelectionModel, QLocale, \
QModelIndex
from PyQt5.QtGui import QContextMenuEvent, QIcon
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView, QInputDialog

from urh import settings
from urh.awre import AutoAssigner
Expand Down Expand Up @@ -864,6 +864,11 @@ def save_protocol(self):

if filename.endswith(".bin"):
self.proto_analyzer.to_binary(filename, use_decoded=True)
elif filename.endswith(".pcapng"):
data_link_type, ok = QInputDialog.getInt(self, "Link type",
"Interface Link Type to use (probably one between DLT_USER0-DLT_USER15 (147-162)):", 147, 0, 65535)
if ok:
self.proto_analyzer.to_pcapng(filename=filename, link_type=data_link_type)
else:
self.proto_analyzer.to_xml_file(filename=filename, decoders=self.decodings,
participants=self.project_manager.participants, write_bits=True)
Expand Down
107 changes: 107 additions & 0 deletions src/urh/dev/PCAPNG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os
import struct
import math

from urh.util.Logger import logger


# Refer to PCAPNG spec
# https://www.ietf.org/staging/draft-tuexen-opsawg-pcapng-02.html

def _build_pcapng_shb(shb_userappl: str = "", shb_hardware: str = "") -> bytes:
BLOCKTYPE = 0x0A0D0D0A
HEADERS_BLOCK_LENGTH = 28
MAGIC_NUMBER = 0x1A2B3C4D
VERSION_MAJOR, VERSION_MINOR = 1, 0
SECTIONLENGTH = 0xFFFFFFFFFFFFFFFF # -1 => Not specified

shb_userappl_padded_len = math.ceil(len(shb_userappl) / 4) * 4
shb_hardware_padded_len = math.ceil(len(shb_hardware) / 4) * 4

total_block_len = HEADERS_BLOCK_LENGTH
if shb_userappl_padded_len > 0:
total_block_len += shb_userappl_padded_len + 4

if shb_hardware_padded_len > 0:
total_block_len += shb_hardware_padded_len + 4

shb = struct.pack(">IIIHHQ",
BLOCKTYPE,
total_block_len,
MAGIC_NUMBER,
VERSION_MAJOR, VERSION_MINOR,
SECTIONLENGTH)

if shb_userappl != "":
SHB_USERAPPL = 4
strpad = shb_userappl.ljust(shb_userappl_padded_len, "\0")
shb += struct.pack(">HH", SHB_USERAPPL, shb_userappl_padded_len)
shb += bytes(strpad, 'ascii')

if shb_hardware != "":
SHB_HARDWARE = 2
strpad = shb_hardware.ljust(shb_hardware_padded_len, "\0")
shb += struct.pack(">HH", SHB_HARDWARE, shb_hardware_padded_len)
shb += bytes(strpad, 'ascii')

shb += struct.pack(">I", total_block_len)
return shb

def _build_pcapng_idb(link_type) -> bytes:
BLOCKTYPE = 0x00000001
BLOCKLENGTH = 20
SNAP_LEN = 0

return struct.pack(">IIHHII",
BLOCKTYPE,
BLOCKLENGTH,
link_type, 0,
SNAP_LEN,
BLOCKLENGTH)

def _build_pcapng_epb(packet: bytes, timestamp: float) -> bytes:
BLOCKTYPE = 0x00000006
BLOCKHEADERLEN = 32
INTERFACE_ID = 0

captured_packet_len = len(packet)
original_packet_len = captured_packet_len
padded_packet_len = math.ceil(captured_packet_len / 4) * 4
padding_len = padded_packet_len - original_packet_len
padded_packet = packet + bytearray(padding_len)
block_total_length = BLOCKHEADERLEN + padded_packet_len
timestamp_int = int(timestamp * 1e6) # Set the proper resolution
timestamp_high = timestamp_int >> 32
timestamp_low = timestamp_int & 0x00000000FFFFFFFF

epb = struct.pack(">IIIIIII",
BLOCKTYPE,
block_total_length,
INTERFACE_ID,
timestamp_high,
timestamp_low,
captured_packet_len,
original_packet_len)
epb += padded_packet
epb += struct.pack(">I", block_total_length)
return epb

def create_pcapng_file(filename: str, shb_userappl: str = "", shb_hardware: str = "",
link_type: int = 147) -> bytes:
if filename == "":
return

shb_bytes = _build_pcapng_shb(shb_userappl, shb_hardware)
idb_bytes = _build_pcapng_idb(link_type)

if os.path.isfile(filename):
logger.warning("{0} already exists. Overwriting it".format(filename))

with open(filename, "wb") as f:
f.write(shb_bytes)
f.write(idb_bytes)

def append_packets_to_pcapng(filename: str, packets: list, timestamps: list):
with open(filename, "ab") as f:
for packet, timestamp in zip(packets, timestamps):
f.write(_build_pcapng_epb(packet, timestamp))
4 changes: 4 additions & 0 deletions src/urh/signalprocessing/Message.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ def decoded_ascii_array(self) -> array.array:
def decoded_ascii_str(self) -> str:
return "".join(map(chr, self.decoded_ascii_array))

@property
def decoded_ascii_buffer(self) -> bytes:
return self.decoded_ascii_array.tobytes()

def __get_bit_range_from_hex_or_ascii_index(self, from_index: int, decoded: bool, is_hex: bool) -> tuple:
bits = self.decoded_bits if decoded else self.plain_bits
factor = 4 if is_hex else 8
Expand Down
8 changes: 8 additions & 0 deletions src/urh/signalprocessing/ProtocolAnalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from urh import settings
from urh.cythonext import signal_functions
import urh.dev.PCAPNG as PCAPNG
from urh.signalprocessing.Encoding import Encoding
from urh.signalprocessing.Message import Message
from urh.signalprocessing.MessageType import MessageType
Expand Down Expand Up @@ -645,6 +646,13 @@ def from_xml_file(self, filename: str, read_bits=False):
root = tree.getroot()
self.from_xml_tag(root, read_bits=read_bits)

def to_pcapng(self, filename : str, hardware_desc_name: str = "", link_type: int = 147):
PCAPNG.create_pcapng_file(filename=filename, shb_userappl="Universal Radio Hacker", shb_hardware=hardware_desc_name, link_type=link_type)
PCAPNG.append_packets_to_pcapng(
filename=filename,
packets=(msg.decoded_ascii_buffer for msg in self.messages),
timestamps=(msg.timestamp for msg in self.messages))

def eliminate(self):
self.message_types = None
self.messages = None
Expand Down
3 changes: 2 additions & 1 deletion src/urh/util/FileOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
WAV_FILE_FILTER = "Waveform Audio File Format (*.wav *.wave)"
PROTOCOL_FILE_FILTER = "Protocol (*.proto.xml *.proto)"
BINARY_PROTOCOL_FILE_FILTER = "Binary Protocol (*.bin)"
WIRESHARK_FILE_FILTER = "Wireshark File (*.pcapng)"
PLAIN_BITS_FILE_FILTER = "Plain Bits (*.txt)"
FUZZING_FILE_FILTER = "Fuzzing Profile (*.fuzz.xml *.fuzz)"
SIMULATOR_FILE_FILTER = "Simulator Profile (*.sim.xml *.sim)"
Expand Down Expand Up @@ -93,7 +94,7 @@ def ask_save_file_name(initial_name: str, caption="Save signal", selected_name_f
elif caption == "Export spectrogram":
name_filter = "Frequency Time (*.ft);;Frequency Time Amplitude (*.fta)"
elif caption == "Save protocol":
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER])
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER, WIRESHARK_FILE_FILTER])
elif caption == "Export demodulated":
name_filter = ";;".join([WAV_FILE_FILTER, SUB_FILE_FILTER])
else:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tests.utils_testing import get_path_for_data_file
from urh import settings
from urh.dev.PCAP import PCAP
import urh.dev.PCAPNG as PCAPNG
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
from urh.signalprocessing.Signal import Signal
from urh.util import util
Expand Down Expand Up @@ -71,6 +72,43 @@ def test_write_pcap(self):
pcap = PCAP()
pcap.write_packets(proto_analyzer.messages, os.path.join(tempfile.gettempdir(), "test.pcap"), 1e6)

def test_write_pcapng(self):
signal = Signal(get_path_for_data_file("ask.complex"), "ASK-Test")
signal.modulation_type = "ASK"
signal.samples_per_symbol = 295
signal.center = -0.1667
self.assertEqual(signal.num_samples, 13710)

proto_analyzer = ProtocolAnalyzer(signal)
proto_analyzer.get_protocol_from_signal()
self.assertEqual(proto_analyzer.decoded_hex_str[0], "b25b6db6c80")

proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))

filepath = os.path.join(tempfile.gettempdir(), "test.pcapng")
PCAPNG.create_pcapng_file(filepath, "Universal Radio Hacker Test", "TestHW", 147)
PCAPNG.append_packets_to_pcapng(
filename=filepath,
packets=(msg.decoded_ascii_buffer for msg in proto_analyzer.messages),
timestamps=(msg.timestamp for msg in proto_analyzer.messages))

# As we don't have PCAPNG importers, we'll verify output just by checking file size, PCAPNG SHB type number
# and that all msg bytes were written somewhere inside output file
filechecks = False
if os.path.isfile(filepath): # ok, file exist
with open(filepath, "rb") as f:
filecontents = f.read()
# min file len= SHB + IDB + 4 EPB msgs
minfilelen = 28 + 20 + (4 * (32 + len(proto_analyzer.messages[0].decoded_ascii_buffer)))
if len(filecontents) >= minfilelen: # ok, min file length passed
if filecontents.find(b'\x0A\x0D\x0D\x0A') >= 0: # ok, seems that SHB was written
if filecontents.find(proto_analyzer.messages[0].decoded_ascii_buffer) >= 0: # ok, msg bytes written
filechecks = True

self.assertTrue(filechecks)

def test_de_bruijn_fuzzing(self):
self.assertEqual(c_util.de_bruijn(3), array.array("B", [0, 0, 0, 1, 0, 1, 1, 1]))
self.assertEqual(c_util.de_bruijn(4), array.array("B", [0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1]))
Expand Down

0 comments on commit ae4682f

Please sign in to comment.