-
Notifications
You must be signed in to change notification settings - Fork 0
/
fires.py
280 lines (221 loc) · 10.6 KB
/
fires.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
from numpy import random, pi
from pandas import read_csv, merge, DataFrame, Series
from math import exp
from steputils import p21
from os import scandir
# triangular distribution sampler
def triangular(left, right, mode=False):
if not mode:
mode = (right - left) / 3 + left
return random.triangular(left, mode, right)
# drawn fire site among fuel area
def mc_rand(csv):
ases = [] # list with partial factors A of each fuel area
probs = [] # list with probabilities of ignition in each fuel area
# calculate partial factor A (area * probability) of each fuel area
for i, r in csv.iterrows():
a = (r['XB'] - r['XA']) * (r['YB'] - r['YA']) * r['MC']
ases.append(a)
# calculate probability of ignition in each fuel area
for a in ases:
probs.append(a/sum(ases))
# return drawn fuel area
return random.choice(len(probs), p=probs)
# drawn fire localization
# cwd == confg_path
def f_localization(ffile):
def random_position(xes, yes, zes):
coordinates = []
[coordinates.append(random.randint(int(10 * i[0]), int(10 * i[1])) / 10) for i in [xes, yes, zes]]
return coordinates
fire_site = mc_rand(ffile) # generate fire coordinates from MC function
config = ffile.iloc[fire_site] # information about chosen fuel site
return config, random_position((config.XA, config.XB), (config.YA, config.YB), zes=(config.ZA, config.ZB))
'''Import fuel configuration from STEP (geometrical distribution) and FUEL (data)
- to be merged with Properties config
cwd == config_path'''
class Fuel:
def __init__(self, title):
self.title = title
self.layers = []
# return points of the solid
def find_points(self, volume, step):
def params(ref, index=1):
par = step.get(ref).entity.params
while '*' in par[index]:
index += 1
if type(par[index]) == p21.Reference:
return [par[index]]
else:
return [*par[index]]
def add_point(new_point):
if new_point not in points:
points.append(new_point)
points = []
for i in params(volume.ref): # manifold
for j in params(i): # closed shells
for k in params(j): # advanced face
for l in params(k): # face outer bound
for m in params(l): # edge loop
for n in params(m): # oriented edge
for o in params(n, index=1): # edge curve -- first point
for p in params(o): # vertex_point
add_point(params(p)) # cartesian_point
for o in params(n, index=2): # edge curve -- first point
for p in params(o): # vertex_point
add_point(params(p)) # cartesian_point
return points
# return all volumes present in step file
def read_step(self, step):
vols = []
for i in step:
if type(i) == p21.SimpleEntityInstance:
name = i.entity.name
if name == 'MANIFOLD_SOLID_BREP': # find volumes
vols.append(i)
return vols
# return solid point entities in [[XA, XB], [YA, YB], [ZA, ZB]]format
def pts2fds(self, pts):
ptset = []
xyz = [[], [], []]
[[xyz[i].append(p[i]) for i in range(3)] for p in pts]
[ptset.extend([min(i), max(i)]) for i in xyz]
return ptset
# return layer name: fuelX, where X is an integer correspondent to .FUL index
def layer(self, ref, layers):
# find layers names
for l in layers:
params = l.entity.params
if ref in params[-1]:
return params[0]
return False
def merge_data(self, vols):
merged = DataFrame(columns=['XA', 'XB', 'YA', 'YB', 'ZA', 'ZB', 'MC', 'hrrpua_min', 'hrrpua_max', 'hrrpua_mode',
't_sprink'])
fuel = read_csv('{}.fuel'.format(self.title))
for v in vols:
data = fuel[fuel.name == v[0]]
# convert list to DF
coords = DataFrame([v[0], *v[1]], index=['name', 'XA', 'XB', 'YA', 'YB', 'ZA', 'ZB']).T
# merge DF1 and DF2
merged = merged.append([merge(coords, data).drop('name', axis=1)])
return merged
def read_fuel(self):
fuel = []
for lay in scandir():
splt = lay.name.split('.')
if splt[-1] == ('step' or 'stp'):
step = p21.readfile(lay.name)
vols = self.read_step(step)
for v in vols:
pts = self.find_points(v, step)
fuel.append([splt[0], self.pts2fds(pts)])
# merge with .FUL config type
return self.merge_data(fuel)
class FuelOBJ(Fuel):
def find_points(self, **kwargs):
file_lines = kwargs['obj_file']
volume = []
volumes = []
vertices = []
is_grouped = False
def save_verts(volume): volumes.append([vertices[int(v_no)-1] for v_no in volume])
for l in file_lines:
if l[0] == 'v' and l[1] != 'n':
vertices.append([float(v)/kwargs['scale'] for v in l.split()[1:]]) # add vertice coords to the list
if l.startswith('g'): # find volume
is_grouped = True
save_verts(volume)
volume.clear()
if l.startswith('f'): # find face
for v in l.split()[1:]:
vertice = v.split('//')[0]
if vertice not in volume: # add face vertice to volume list if not already present
volume.append(vertice)
if not is_grouped:
raise RuntimeError('[ERROR] There is no group in OBJ file. It is required to group all volume boxes using'
'"g" at the line beginning. For further information about Wavefront OBJ format and '
'grouping see: http://fegemo.github.io/cefet-cg/attachments/obj-spec.pdf [2021-09-16]')
save_verts(volume)
return volumes[1:]
def read_fuel(self):
fuel = []
for lay in scandir():
splt = lay.name.split('.')
if splt[-1] == ('obj'):
with open(lay.name) as file:
obj = file.readlines()
for volume in self.find_points(obj_file=obj, scale=1000):
fuel.append([splt[0], self.pts2fds(volume)])
if len(fuel) == 0:
raise RuntimeError('[ERROR] No fuel data imported from {}'.format(''.join(splt)))
elif fuel[-1][0] != splt[0]:
raise RuntimeError('[ERROR] No fuel data imported from {}'.format(''.join(splt)))
return self.merge_data(fuel) # merge with .FUL config type
'''Draw fire config from input distributions
operates on the fire types included in the class - to be changed'''
class Fire:
def __init__(self, t_end, properties, fire_z, occupation):
self.t_end = t_end # duration of simulation
self.hrr_max = 50e6 # W model limitation of HRR
self.d_max = 10 # m model limitation of HRR
self.config = properties
self.fire_z = fire_z
self.occ = occupation
# calculate HRRPUA according to triangular distribution specified by user
def hrrpua(self):
upward = self.config.ZB - self.fire_z
downward = self.fire_z - self.config.ZA
reduction = (upward + downward/10) / self.config.hrrpua_height # scale HRRPUA acc. to NFPA204 (1/10 downwards)
return reduction * triangular(self.config.hrrpua_min, self.config.hrrpua_max, mode=self.config.hrrpua_mode)
# calculate ALPHA according to the experimental log-norm or user's triangular distribution
def alpha(self, hrrpua):
if not self.occ:
return triangular(self.config.alpha_min, self.config.alpha_max, mode=self.config.alpha_mode) # [kW/s2]
elif self.occ == 'store':
return hrrpua * random.lognormal(-9.72, 0.97) # [kW/s2]
# t-squared fire
def t_squared(self):
hrrpua = self.hrrpua() # [kW/m2]
alpha = self.alpha(hrrpua) # [kW/s2]
hrr_tab = []
diam_tab = []
for i in range(0, 99):
t = int(self.t_end * i/98)
# calculate HRR, append
hrr_tab.append([t, round(alpha * (t ** 2) * 1000, 4)]) # [time /s/, HRR /W/]
if hrr_tab[-1][-1] > self.hrr_max: # check if hrr_tab does not exceed model limitation
hrr_tab[-1][-1] = self.hrr_max
# calculate diameter, append
diam_tab.append([t, 2 * (hrr_tab[-1][-1] / (hrrpua*1000*pi))**0.5]) # [time /s/, diameter /m/]
if diam_tab[-1][-1] > self.d_max: # check if hrr_tab does not exceed model limitation
diam_tab[-1][-1] = self.d_max
return hrr_tab, diam_tab, hrrpua, alpha
def change(self, time_crit, tab, value_crit):
#t_squared
return tab
def burn(self):
hrr_tab, diam_tab, hrrpua, alpha = self.t_squared()
q_0 = round(alpha * (self.config.t_sprink ** 2) * 1000, 4)
changed_tabs = [self.change(self.config.t_sprink, *tab) for tab in
[(hrr_tab, q_0), (diam_tab, (q_0 / (hrrpua * 1000 * pi)) ** 0.5)]]
return changed_tabs[0], changed_tabs[1], hrrpua, alpha
class SprinkNoEff(Fire):
# modify t-squared curve to taking non-effective sprinklers into account
def change(self, time_crit, tab, value_crit):
for i in range(len(tab)):
if tab[i][0] >= time_crit:
tab[i] = [tab[i][0], value_crit]
return tab
class SprinkEff(Fire):
# modify t-squared curve to taking non-effective sprinklers into account
def change(self, time_crit, tab, value_crit, ):
value_limit = round(0.15 * value_crit)
for i in range(len(tab)):
if tab[i][0] >= time_crit:
value_red = round(value_crit * exp(-0.0024339414 * (tab[i][0] - self.config.t_sprink)), 4)
if value_red > value_limit:
tab[i] = [tab[i][0], value_red]
else:
tab[i] = [tab[i][0], value_limit]
return tab