Skip to content

Commit

Permalink
Adding Save As pcapng for ProtocolAnalyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
jpacov committed Mar 28, 2022
1 parent 7ddf54b commit 4ba65ec
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 2 deletions.
7 changes: 6 additions & 1 deletion src/urh/controller/CompareFrameController.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from PyQt5.QtCore import pyqtSlot, QTimer, Qt, pyqtSignal, QItemSelection, QItemSelectionModel, QLocale, \
QModelIndex
from PyQt5.QtGui import QContextMenuEvent, QIcon
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView, QInputDialog

from urh import settings
from urh.awre import AutoAssigner
Expand Down Expand Up @@ -864,6 +864,11 @@ def save_protocol(self):

if filename.endswith(".bin"):
self.proto_analyzer.to_binary(filename, use_decoded=True)
elif filename.endswith(".pcapng"):
data_link_type, ok = QInputDialog.getInt(self, "Link type",
"Interface Link Type to use (probably one between DLT_USER0-DLT_USER15 (147-162)):", 147, 0, 65535)
if ok:
self.proto_analyzer.to_pcapng(filename=filename, link_type=data_link_type)
else:
self.proto_analyzer.to_xml_file(filename=filename, decoders=self.decodings,
participants=self.project_manager.participants, write_bits=True)
Expand Down
116 changes: 116 additions & 0 deletions src/urh/dev/PCAPNG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os
import struct
import time
import math

from urh.util.Logger import logger

from urh.signalprocessing.Message import Message

# Refer to PCAPNG spec
# https://www.ietf.org/staging/draft-tuexen-opsawg-pcapng-02.html

class PCAPNG(object):
def __init__(self):
self.__messages = []
self.__messages_timestamps = []
self.__filename = ""

def __build_pcapng_shb(self, shb_userappl : str = "", shb_hardware : str = "") -> bytes:
BLOCKTYPE = 0x0A0D0D0A
HEADERS_BLOCK_LENGTH = 28
MAGIC_NUMBER = 0x1A2B3C4D
VERSION_MAJOR, VERSION_MINOR = 1,0
SECTIONLENGTH = 0xFFFFFFFFFFFFFFFF # -1 => Not specified

shb_userappl_padded_len = math.ceil(len(shb_userappl)/4) * 4
shb_hardware_padded_len = math.ceil(len(shb_hardware)/4) * 4

total_block_len = HEADERS_BLOCK_LENGTH
if shb_userappl_padded_len > 0:
total_block_len += shb_userappl_padded_len + 4

if shb_hardware_padded_len > 0:
total_block_len += shb_hardware_padded_len + 4

shb = struct.pack(">IIIHHQ",
BLOCKTYPE,
total_block_len,
MAGIC_NUMBER,
VERSION_MAJOR, VERSION_MINOR,
SECTIONLENGTH)

if shb_userappl != "":
SHB_USERAPPL = 4
strpad = shb_userappl.ljust(shb_userappl_padded_len, "\0")
shb += struct.pack(">HH", SHB_USERAPPL, shb_userappl_padded_len)
shb += bytes(strpad, 'ascii')

if shb_hardware != "":
SHB_HARDWARE = 2
strpad = shb_hardware.ljust(shb_hardware_padded_len, "\0")
shb += struct.pack(">HH", SHB_HARDWARE, shb_hardware_padded_len)
shb += bytes(strpad, 'ascii')

shb += struct.pack(">I",total_block_len)
return shb

def __build_pcapng_idb(self, link_type) -> bytes:
BLOCKTYPE = 0x00000001
BLOCKLENGTH = 20
SNAP_LEN = 0

return struct.pack(">IIHHII",
BLOCKTYPE,
BLOCKLENGTH,
link_type, 0,
SNAP_LEN,
BLOCKLENGTH)

def __build_pcapng_epb(self, packet : bytes, timestamp : float) -> bytes:
BLOCKTYPE = 0x00000006
BLOCKHEADERLEN = 32
INTERFACE_ID = 0

captured_packet_len = len(packet)
original_packet_len = captured_packet_len
padded_packet_len = math.ceil(captured_packet_len/4) * 4
padding_len = padded_packet_len - original_packet_len
padded_packet = packet + bytearray(padding_len)
block_total_length = BLOCKHEADERLEN + padded_packet_len
timestamp_int = int(timestamp *1e6) # Set the proper resolution
timestamp_high = timestamp_int >> 32
timestamp_low = timestamp_int & 0x00000000FFFFFFFF

epb = struct.pack(">IIIIIII",
BLOCKTYPE,
block_total_length,
INTERFACE_ID,
timestamp_high,
timestamp_low,
captured_packet_len,
original_packet_len)
epb += padded_packet
epb += struct.pack(">I", block_total_length)
return epb

############################################################################################################
# PUBLIC INTERFACES
def create_pcapng_file(self, filename : str, shb_userappl : str = "", shb_hardware : str = "", link_type : int =147) -> bytes:
if (filename == ""):
return
self.__filename = filename

shb_bytes = self.__build_pcapng_shb(shb_userappl,shb_hardware)
idb_bytes = self.__build_pcapng_idb(link_type)

if os.path.isfile(self.__filename):
logger.warning("{0} already exists. Overwriting it".format(filename))

with open(self.__filename, "wb") as f:
f.write(shb_bytes)
f.write(idb_bytes)

def pcapng_file_append_packet(self, packet : bytes, timestamp: float):
with open(self.__filename, "ab") as f:
f.write(self.__build_pcapng_epb(packet, timestamp))
4 changes: 4 additions & 0 deletions src/urh/signalprocessing/Message.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ def decoded_ascii_array(self) -> array.array:
def decoded_ascii_str(self) -> str:
return "".join(map(chr, self.decoded_ascii_array))

@property
def decoded_ascii_buffer(self) -> bytes:
return self.decoded_ascii_array.tobytes()

def __get_bit_range_from_hex_or_ascii_index(self, from_index: int, decoded: bool, is_hex: bool) -> tuple:
bits = self.decoded_bits if decoded else self.plain_bits
factor = 4 if is_hex else 8
Expand Down
7 changes: 7 additions & 0 deletions src/urh/signalprocessing/ProtocolAnalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from urh import settings
from urh.cythonext import signal_functions
from urh.dev.PCAPNG import PCAPNG
from urh.signalprocessing.Encoding import Encoding
from urh.signalprocessing.Message import Message
from urh.signalprocessing.MessageType import MessageType
Expand Down Expand Up @@ -645,6 +646,12 @@ def from_xml_file(self, filename: str, read_bits=False):
root = tree.getroot()
self.from_xml_tag(root, read_bits=read_bits)

def to_pcapng(self, filename : str, hardware_desc_name: str = "", link_type: int = 147):
pcapng = PCAPNG()
pcapng.create_pcapng_file(filename=filename, shb_userappl="Universal Radio Hacker", shb_hardware=hardware_desc_name, link_type=link_type)
for msg in self.messages:
pcapng.pcapng_file_append_packet(msg.decoded_ascii_buffer, msg.timestamp)

def eliminate(self):
self.message_types = None
self.messages = None
Expand Down
3 changes: 2 additions & 1 deletion src/urh/util/FileOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
WAV_FILE_FILTER = "Waveform Audio File Format (*.wav *.wave)"
PROTOCOL_FILE_FILTER = "Protocol (*.proto.xml *.proto)"
BINARY_PROTOCOL_FILE_FILTER = "Binary Protocol (*.bin)"
WIRESHARK_FILE_FILTER = "Wireshark File (*.pcapng)"
PLAIN_BITS_FILE_FILTER = "Plain Bits (*.txt)"
FUZZING_FILE_FILTER = "Fuzzing Profile (*.fuzz.xml *.fuzz)"
SIMULATOR_FILE_FILTER = "Simulator Profile (*.sim.xml *.sim)"
Expand Down Expand Up @@ -93,7 +94,7 @@ def ask_save_file_name(initial_name: str, caption="Save signal", selected_name_f
elif caption == "Export spectrogram":
name_filter = "Frequency Time (*.ft);;Frequency Time Amplitude (*.fta)"
elif caption == "Save protocol":
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER])
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER, WIRESHARK_FILE_FILTER])
elif caption == "Export demodulated":
name_filter = WAV_FILE_FILTER
else:
Expand Down
37 changes: 37 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tests.utils_testing import get_path_for_data_file
from urh import settings
from urh.dev.PCAP import PCAP
from urh.dev.PCAPNG import PCAPNG
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
from urh.signalprocessing.Signal import Signal
from urh.util import util
Expand Down Expand Up @@ -71,6 +72,42 @@ def test_write_pcap(self):
pcap = PCAP()
pcap.write_packets(proto_analyzer.messages, os.path.join(tempfile.gettempdir(), "test.pcap"), 1e6)

def test_write_pcapng(self):
signal = Signal(get_path_for_data_file("ask.complex"), "ASK-Test")
signal.modulation_type = "ASK"
signal.samples_per_symbol = 295
signal.center = -0.1667
self.assertEqual(signal.num_samples, 13710)

proto_analyzer = ProtocolAnalyzer(signal)
proto_analyzer.get_protocol_from_signal()
self.assertEqual(proto_analyzer.decoded_hex_str[0], "b25b6db6c80")

proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))

pcap = PCAPNG()
pcap.create_pcapng_file(os.path.join(tempfile.gettempdir(), "test.pcapng"), "Universal Radio Hacker Test", "TestHW", 147)
for msg in proto_analyzer.messages:
pcap.pcapng_file_append_packet(msg.decoded_ascii_buffer, 1e6)

# As we don't have PCAPNG importers, we'll verify output just by checking file size, PCAPNG SHB type number
# and that all msg bytes were written somewhere inside output file
filepath = os.path.join(tempfile.gettempdir(), "test.pcapng")
filechecks = False
if os.path.isfile(filepath): # ok, file exist
with open(filepath, "rb") as f:
filecontents = f.read()
# min file len= SHB + IDB + 4 EPB msgs
minfilelen = 28 + 20 + (4 * (32 + len(proto_analyzer.messages[0].decoded_ascii_buffer)))
if len(filecontents) >= minfilelen: # ok, min file length passed
if filecontents.find(b'\x0A\x0D\x0D\x0A') >= 0: # ok, seems that SHB was written
if filecontents.find(msg.decoded_ascii_buffer) >= 0: # ok, msg bytes written
filechecks = True

self.assertTrue(filechecks)

def test_de_bruijn_fuzzing(self):
self.assertEqual(c_util.de_bruijn(3), array.array("B", [0, 0, 0, 1, 0, 1, 1, 1]))
self.assertEqual(c_util.de_bruijn(4), array.array("B", [0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1]))
Expand Down

0 comments on commit 4ba65ec

Please sign in to comment.