-
Notifications
You must be signed in to change notification settings - Fork 115
/
dcrnn_model.py
167 lines (141 loc) · 7.46 KB
/
dcrnn_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import numpy as np
import torch
import torch.nn as nn
from model.pytorch.dcrnn_cell import DCGRUCell
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
class Seq2SeqAttrs:
def __init__(self, adj_mx, **model_kwargs):
self.adj_mx = adj_mx
self.max_diffusion_step = int(model_kwargs.get('max_diffusion_step', 2))
self.cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000))
self.filter_type = model_kwargs.get('filter_type', 'laplacian')
self.num_nodes = int(model_kwargs.get('num_nodes', 1))
self.num_rnn_layers = int(model_kwargs.get('num_rnn_layers', 1))
self.rnn_units = int(model_kwargs.get('rnn_units'))
self.hidden_state_size = self.num_nodes * self.rnn_units
class EncoderModel(nn.Module, Seq2SeqAttrs):
def __init__(self, adj_mx, **model_kwargs):
nn.Module.__init__(self)
Seq2SeqAttrs.__init__(self, adj_mx, **model_kwargs)
self.input_dim = int(model_kwargs.get('input_dim', 1))
self.seq_len = int(model_kwargs.get('seq_len')) # for the encoder
self.dcgru_layers = nn.ModuleList(
[DCGRUCell(self.rnn_units, adj_mx, self.max_diffusion_step, self.num_nodes,
filter_type=self.filter_type) for _ in range(self.num_rnn_layers)])
def forward(self, inputs, hidden_state=None):
"""
Encoder forward pass.
:param inputs: shape (batch_size, self.num_nodes * self.input_dim)
:param hidden_state: (num_layers, batch_size, self.hidden_state_size)
optional, zeros if not provided
:return: output: # shape (batch_size, self.hidden_state_size)
hidden_state # shape (num_layers, batch_size, self.hidden_state_size)
(lower indices mean lower layers)
"""
batch_size, _ = inputs.size()
if hidden_state is None:
hidden_state = torch.zeros((self.num_rnn_layers, batch_size, self.hidden_state_size),
device=device)
hidden_states = []
output = inputs
for layer_num, dcgru_layer in enumerate(self.dcgru_layers):
next_hidden_state = dcgru_layer(output, hidden_state[layer_num])
hidden_states.append(next_hidden_state)
output = next_hidden_state
return output, torch.stack(hidden_states) # runs in O(num_layers) so not too slow
class DecoderModel(nn.Module, Seq2SeqAttrs):
def __init__(self, adj_mx, **model_kwargs):
# super().__init__(is_training, adj_mx, **model_kwargs)
nn.Module.__init__(self)
Seq2SeqAttrs.__init__(self, adj_mx, **model_kwargs)
self.output_dim = int(model_kwargs.get('output_dim', 1))
self.horizon = int(model_kwargs.get('horizon', 1)) # for the decoder
self.projection_layer = nn.Linear(self.rnn_units, self.output_dim)
self.dcgru_layers = nn.ModuleList(
[DCGRUCell(self.rnn_units, adj_mx, self.max_diffusion_step, self.num_nodes,
filter_type=self.filter_type) for _ in range(self.num_rnn_layers)])
def forward(self, inputs, hidden_state=None):
"""
Decoder forward pass.
:param inputs: shape (batch_size, self.num_nodes * self.output_dim)
:param hidden_state: (num_layers, batch_size, self.hidden_state_size)
optional, zeros if not provided
:return: output: # shape (batch_size, self.num_nodes * self.output_dim)
hidden_state # shape (num_layers, batch_size, self.hidden_state_size)
(lower indices mean lower layers)
"""
hidden_states = []
output = inputs
for layer_num, dcgru_layer in enumerate(self.dcgru_layers):
next_hidden_state = dcgru_layer(output, hidden_state[layer_num])
hidden_states.append(next_hidden_state)
output = next_hidden_state
projected = self.projection_layer(output.view(-1, self.rnn_units))
output = projected.view(-1, self.num_nodes * self.output_dim)
return output, torch.stack(hidden_states)
class DCRNNModel(nn.Module, Seq2SeqAttrs):
def __init__(self, adj_mx, logger, **model_kwargs):
super().__init__()
Seq2SeqAttrs.__init__(self, adj_mx, **model_kwargs)
self.encoder_model = EncoderModel(adj_mx, **model_kwargs)
self.decoder_model = DecoderModel(adj_mx, **model_kwargs)
self.cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000))
self.use_curriculum_learning = bool(model_kwargs.get('use_curriculum_learning', False))
self._logger = logger
def _compute_sampling_threshold(self, batches_seen):
return self.cl_decay_steps / (
self.cl_decay_steps + np.exp(batches_seen / self.cl_decay_steps))
def encoder(self, inputs):
"""
encoder forward pass on t time steps
:param inputs: shape (seq_len, batch_size, num_sensor * input_dim)
:return: encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size)
"""
encoder_hidden_state = None
for t in range(self.encoder_model.seq_len):
_, encoder_hidden_state = self.encoder_model(inputs[t], encoder_hidden_state)
return encoder_hidden_state
def decoder(self, encoder_hidden_state, labels=None, batches_seen=None):
"""
Decoder forward pass
:param encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size)
:param labels: (self.horizon, batch_size, self.num_nodes * self.output_dim) [optional, not exist for inference]
:param batches_seen: global step [optional, not exist for inference]
:return: output: (self.horizon, batch_size, self.num_nodes * self.output_dim)
"""
batch_size = encoder_hidden_state.size(1)
go_symbol = torch.zeros((batch_size, self.num_nodes * self.decoder_model.output_dim),
device=device)
decoder_hidden_state = encoder_hidden_state
decoder_input = go_symbol
outputs = []
for t in range(self.decoder_model.horizon):
decoder_output, decoder_hidden_state = self.decoder_model(decoder_input,
decoder_hidden_state)
decoder_input = decoder_output
outputs.append(decoder_output)
if self.training and self.use_curriculum_learning:
c = np.random.uniform(0, 1)
if c < self._compute_sampling_threshold(batches_seen):
decoder_input = labels[t]
outputs = torch.stack(outputs)
return outputs
def forward(self, inputs, labels=None, batches_seen=None):
"""
seq2seq forward pass
:param inputs: shape (seq_len, batch_size, num_sensor * input_dim)
:param labels: shape (horizon, batch_size, num_sensor * output)
:param batches_seen: batches seen till now
:return: output: (self.horizon, batch_size, self.num_nodes * self.output_dim)
"""
encoder_hidden_state = self.encoder(inputs)
self._logger.debug("Encoder complete, starting decoder")
outputs = self.decoder(encoder_hidden_state, labels, batches_seen=batches_seen)
self._logger.debug("Decoder complete")
if batches_seen == 0:
self._logger.info(
"Total trainable parameters {}".format(count_parameters(self))
)
return outputs