-
Notifications
You must be signed in to change notification settings - Fork 30
/
corpus_to_pcap.py
111 lines (85 loc) · 2.84 KB
/
corpus_to_pcap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python
#
# Script which converts corpus files to pcap files.
import argparse
import logging
import sys
from scapy.all import IP, TCP, wrpcap
import corpus
log = logging.getLogger(__name__)
# All the responses we want to convert to pcap
RESPONSES = [
corpus.BaseType.TYPE_RSP0,
corpus.BaseType.TYPE_RSP1,
corpus.BaseType.TYPE_RSP2,
corpus.BaseType.TYPE_RSP3,
corpus.BaseType.TYPE_RSP4,
corpus.BaseType.TYPE_RSP5,
corpus.BaseType.TYPE_RSP6,
corpus.BaseType.TYPE_RSP7,
corpus.BaseType.TYPE_RSP8,
corpus.BaseType.TYPE_RSP9,
corpus.BaseType.TYPE_RSP10,
corpus.BaseType.TYPE_SECRSP0,
corpus.BaseType.TYPE_SECRSP1,
]
def corpus_to_pcap(options):
response_tlvs = {}
with open(options.input, "rb") as f:
dec = corpus.TLVDecoder(f.read())
for tlv in dec:
if tlv.type in RESPONSES:
log.debug("Found response: %s", tlv)
response_tlvs[tlv.type] = tlv
else:
log.debug("Ignoring: %s", tlv)
response_packets = []
for rsp in RESPONSES:
if rsp in response_tlvs:
tlv = response_tlvs[rsp]
# By default generate a packet with source port 80. This hints at HTTP; in future we can be smart and
# pick a port that'll influence Wireshark. But for now, you can just Decode As.. in Wireshark to get
# whatever protocol you want.
pkt = IP() / TCP(sport=80, flags='SA') / tlv.data
log.debug("Converted %s to packet: %s", tlv.TYPEMAP[rsp], pkt)
response_packets.append(pkt)
log.debug("Writing %d packets to %s", len(response_packets), options.output)
wrpcap(options.output, response_packets)
return ScriptRC.SUCCESS
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", required=True)
return parser.parse_args()
def setup_logging():
"""
Set up logging from the command line options
"""
root_logger = logging.getLogger()
formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
stdout_handler.setLevel(logging.DEBUG)
root_logger.addHandler(stdout_handler)
root_logger.setLevel(logging.DEBUG)
class ScriptRC(object):
"""Enum for script return codes"""
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
class ScriptException(Exception):
pass
def main():
# Get the options from the user.
options = get_options()
setup_logging()
# Run main script.
try:
rc = corpus_to_pcap(options)
except Exception as e:
log.exception(e)
rc = ScriptRC.EXCEPTION
log.info("Returning %d", rc)
return rc
if __name__ == '__main__':
sys.exit(main())