This repository has been archived by the owner on Jun 9, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
extract-OTA.py
executable file
·235 lines (190 loc) · 11.4 KB
/
extract-OTA.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/python3
import os
import json
import binascii
import struct
filename = 'OTA_CLUSTER.json'
"""
<LHHHHHLH32BLBQHH
Header[0:69]:
IKEA LEDVANCE LEGRAND
Long ==> file_id 0xbeef11e 0xbeef11e
Ushort ==> header_version 0x100 0x100
Ushort ==> header_lenght 0x38 0x38
Ushort ==> header_fctl 0x0 0x0
Ushort ==> manufacturer_code: 0x117c 0x1189 0x1021
Ushort ==> image_type: see Definition below
Long ==> image_version 0x00204103
xx -------- Application release 0x00
xx ------ Application Build 0x20 ( 32 )
xx ---- Stack release 0x41 ( 65 )
xx -- Stack Build 0x03 ( 03 )
Ushort ==> stack_version 0x2 0x2 0x3
32uchar => header_str
Long ==> size: => Size of the file
Uchar ==> security_cred_version 0x0 0x0
Ulonglong> upgrade_file_dest
Ushort ==> min_hw_version
Ushort ==> max_hw_version
"""
IMAGE_TYPES = {
# 0xffff stand for unknown at that stage
'Cable outlet': 0xffff,
'Connected outlet': 0x0011,
'Dimmer switch w/o neutral': 0x000e,
'Shutter switch with neutral': 0x0013,
'Micromodule switch': 0x0010,
'Remote switch': 0xffff,
'Double gangs remote switch': 0x0016,
'Shutters central remote switch': 0xffff
}
firmware = {}
last_offset = 0
last_datasize = 0
last_sqn = 0
print("Opening input file %s" %filename)
with open( filename, 'r') as f:
array = json.load(f)
print("Processing data")
count = 0
for item in array:
count += 1
if '_source' not in item:
continue
if 'layers' not in item['_source']:
continue
if 'zbee_aps' not in item['_source']['layers']:
continue
if 'zbee_aps.cluster' not in item['_source']['layers']['zbee_aps']:
continue
if 'zbee_zcl' not in item['_source']['layers']:
continue
if 'zbee_zcl.cmd.tsn' in item['_source']['layers']['zbee_zcl']:
if last_sqn + 1 < int( item['_source']['layers']['zbee_zcl']['zbee_zcl.cmd.tsn']):
print("====> Lost in Sequence %s versus %s" %(last_sqn, int(item['_source']['layers']['zbee_zcl']['zbee_zcl.cmd.tsn'])))
last_sqn = int(item['_source']['layers']['zbee_zcl']['zbee_zcl.cmd.tsn'])
if item['_source']['layers']['zbee_aps']['zbee_aps.cluster'] != '25': # We are looking only for Cluster 0x19
continue
if 'zbee_zcl_general.ota.cmd.srv_rx.id' in item['_source']['layers']['zbee_zcl']:
if item['_source']['layers']['zbee_zcl']['zbee_zcl_general.ota.cmd.srv_rx.id'] == '0x00000001':
# Query Next Image Request
manuf_code = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.manufacturer_code']
image_type = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.image.type']
file_version = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.file.version']
print("0x01 - ManufCode: %s, Type: %s, Version: %s" %(manuf_code, image_type, file_version ))
elif item['_source']['layers']['zbee_zcl']['zbee_zcl_general.ota.cmd.srv_rx.id'] == '0x00000004':
manuf_code = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.manufacturer_code']
image_type = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.image.type']
file_version = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.file.version']
offset = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.file.offset']
data_size = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.max_data_size']
page_size = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.page.size']
rsp_spacing = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.rsp_spacing']
###print("0x04 - ManufCode: %s, Type: %s, Version: %s, Offset: %s, Dsize: %s, Psize: %s, RspSpace: %s" %(manuf_code, image_type, file_version, offset, data_size,page_size, rsp_spacing))
elif 'zbee_zcl_general.ota.cmd.srv_tx.id' in item['_source']['layers']['zbee_zcl']:
if item['_source']['layers']['zbee_zcl']['zbee_zcl_general.ota.cmd.srv_tx.id'] == '0x00000002':
if item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.status'] == '0x00000000':
manuf_code = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.manufacturer_code']
image_type = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.image.type']
file_version = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.file.version']
file_size = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.image.size']
print("New Firmware to be captured: Manuf: %s, Type: %s, Version: %s, Size: %s" %(manuf_code, image_type, file_version, file_size))
if image_type not in firmware:
last_offset = 0
firmware[image_type] = {}
firmware[image_type]['Version'] = file_version
firmware[image_type]['ManufCode'] = manuf_code
firmware[image_type]['Size'] = file_size
firmware[image_type]['Image'] = {}
else:
print("========> Something wrong ... already existing ....")
if item['_source']['layers']['zbee_zcl']['zbee_zcl_general.ota.cmd.srv_tx.id'] == '0x00000005':
manuf_code = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.manufacturer_code']
image_type = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.image.type']
file_version = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.file.version']
Offset = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.file.offset']
DataSize = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.data_size']
Data = item['_source']['layers']['zbee_zcl']['Payload']['zbee_zcl_general.ota.image.data']
if image_type not in firmware:
last_offset = 0
firmware[image_type] = {}
firmware[image_type]['ManufCode'] = manuf_code
firmware[image_type]['Version'] = file_version
firmware[image_type]['Image'] = {}
if 'Image' not in firmware[image_type]:
firmware[image_type]['Image'] = {}
if firmware[image_type]['Version'] != file_version:
print("Error missmatch of Version %s versus %s" %(firmware[image_type]['Version'], file_version))
continue
if firmware[image_type]['ManufCode'] != manuf_code:
print("Error missmatch of Manuf Code %s versus %s" %(firmware[image_type]['ManufCode'], manuf_code))
continue
if Offset not in firmware[image_type]['Image']:
firmware[image_type]['Image'][Offset] = {}
firmware[image_type]['Image'][Offset]['Size'] = DataSize
firmware[image_type]['Image'][Offset]['Data'] = Data
if Data != firmware[image_type]['Image'][Offset]['Data']:
print("===============> Different Data: ")
print(" New: %s" %( Data))
print(" Old: %s" %( firmware[image_type]['Image'][Offset]['Data']))
else:
print("-----> DEDUP - Offset: %s already loaded" %(Offset))
continue
#if last_offset + last_datasize != int( Offset):
# print("===> Last Offset: %s Last Datasize: %s new Offset: %s ==> Gap: %s" %(last_offset, last_datasize, Offset, int(Offset) - ( last_offset + last_datasize)))
last_offset = int(Offset)
last_datasize = int(DataSize)
for firm in firmware:
filename = 'FirmwaresDB/OTA_%s_%s_%s' %( firm, firmware[firm]['ManufCode'], firmware[firm]['Version'] )
updateExisting = 0
hole = 0
existingImage = None
sorted_offset = sorted( firmware[firm]['Image'].keys())
if os.path.isfile( filename + '.json' ):
with open( filename + '.json' , 'r') as old_f:
existingImage = json.load( old_f)
if existingImage:
if existingImage['Version'] != firmware[firm]['Version'] or existingImage['ManufCode'] != firmware[firm]['ManufCode'] or existingImage['Size'] != firmware[firm]['Size']:
print("===> Mismatch between Existing json %s and current loaded infos ...")
continue
for offset in sorted_offset:
if existingImage and offset not in existingImage['Image']:
updateExisting += 1
print("====> Good news we found a new bloc at offset %s" %offset)
existingImage['Image'][offset] = {}
existingImage['Image'][offset]['Size'] = firmware[firm]['Image'][offset]['Size']
existingImage['Image'][offset]['Data'] = firmware[firm]['Image'][offset]['Data']
if str( int(offset) + int(firmware[firm]['Image'][offset]['Size']) ) not in firmware[firm]['Image']:
hole += 1
#print("[%s] ---> %s hole at offset: %s" %(hole, firm, int(offset) + int(firmware[firm]['Image'][offset]['Size']) ))
print("Captured firmware:")
print("--> Type : %s" %firm)
print("--> ManufCo: %s" %firmware[firm]['ManufCode'])
print("--> Version: %s" %firmware[firm]['Version'])
print("--> Chunk : %s" %len(firmware[firm]['Image']))
print("--> Detected Holes : %s" %hole)
if 'Size' in firmware[firm]:
print("--> Size : %s" %firmware[firm]['Size'])
print("--> Expected Chunks based on 64 bytes: %s" %(round(int(firmware[firm]['Size'])/64,0)))
print("--> Lacking packets estimated : %s" %( (round(int(firmware[firm]['Size'])/64,0)) - len(firmware[firm]['Image'])))
if updateExisting:
print("Updating Existing: %s" %(filename + '.json'))
with open( filename +'.json', 'w') as fp:
json.dump(existingImage, fp)
else:
print("Creating : %s" %(filename + '.json'))
with open( filename +'.json', 'w') as fp:
json.dump(firmware[firm], fp)
if hole == 0:
# Store the Firmware
raw_headers = b'' # Format: <LHHHHHLH32BLBQHH
raw_image = b''
for offset in sorted_offset:
hex_data = firmware[firm]['Image'][offset]['Data'].split(':')
for hex_byte in hex_data:
raw_image += struct.pack("<B",int(hex_byte, 16))
imageFile = open(filename +'.ota', "wb")
imageFile.write(raw_headers)
imageFile.write(raw_image)
imageFile.close()
print("Process completed. %s line analysed" %count)