self_example/pytorch_example/RUL/otherIdea/adaDctEmdLSTM/model.py

412 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/15 14:44
@Usage :
@Desc :
'''
import torch
import torch.nn as nn
from RUL.otherIdea.adaRNN.loss_transfer import TransferLoss
import torch.nn.functional as F
from RUL.baseModel.dctChannelAttention import dct_channel_block
import torch.nn as nn
import torch
from RUL.baseModel.dctAttention import dct_channel_block
class dctLSTMCell(nn.Module):
def __init__(self, input_dim, hidden_dim, bias):
"""
Initialize ConvLSTM cell.
Parameters
----------
input_dim: int
Number of channels of input tensor.
hidden_dim: int
Number of channels of hidden state.
kernel_size: int
Size of the convolutional kernel.
bias: bool
Whether or not to add the bias.
Input:
A tensor of size B, T, C
B: bacth_size
T: timestamp
C: channel
"""
super(dctLSTMCell, self).__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.bias = bias
self.hidden = nn.Linear(in_features=self.input_dim + self.hidden_dim,
out_features=4 * self.hidden_dim,
bias=self.bias)
self.attention = dct_channel_block(channel=self.input_dim + self.hidden_dim)
def forward(self, input_tensor, cur_state):
# shape :b,c
h_cur, c_cur = cur_state
combined = torch.cat([input_tensor, h_cur], dim=-1) # concatenate along channel axis
# 增加一个channelAttention
combined = self.attention(combined)
combined_linear = self.hidden(combined)
cc_i, cc_f, cc_o, cc_g = torch.split(combined_linear, self.hidden_dim, dim=-1)
i = torch.sigmoid(cc_i)
f = torch.sigmoid(cc_f)
o = torch.sigmoid(cc_o)
g = torch.tanh(cc_g)
c_next = f * c_cur + i * g
h_next = o * torch.tanh(c_next)
return h_next, c_next
def init_hidden(self, batch_size):
return (torch.zeros(batch_size, self.hidden_dim, device=self.hidden.weight.device),
torch.zeros(batch_size, self.hidden_dim, device=self.hidden.weight.device))
class LSTM(nn.Module):
"""
Parameters:
input_dim: Number of channels in input
hidden_dim: Number of hidden channels
kernel_size: Size of kernel in convolutions
num_layers: Number of LSTM layers stacked on each other
batch_first: Whether or not dimension 0 is the batch or not
bias: Bias or no bias in Convolution
return_all_layers: Return the list of computations for all layers
Note: Will do same padding.
Input:
A tensor of size B, T, C or T, B, C
Output:
A tuple of two lists of length num_layers (or length 1 if return_all_layers is False).
0 - layer_output_list is the list of lists of length T of each output
1 - last_state_list is the list of last states
each element of the list is a tuple (h, c) for hidden state and memory
Example:
>> x = torch.rand((32, 10, 64))
>> convlstm = ConvLSTM(64, 16, 3, 1, True, True, False)
>> _, last_states = convlstm(x)
>> h = last_states[0][0] # 0 for layer index, 0 for h index
"""
def __init__(self, input_dim, hidden_dim, num_layers,
batch_first=False, bias=True, return_all_layers=False):
super(LSTM, self).__init__()
# Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers
hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
if not len(hidden_dim) == num_layers:
raise ValueError('Inconsistent list length.')
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.batch_first = batch_first
self.bias = bias
self.return_all_layers = return_all_layers
cell_list = []
for i in range(0, self.num_layers):
cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]
cell_list.append(
dctLSTMCell(input_dim=cur_input_dim,
hidden_dim=self.hidden_dim[i],
bias=self.bias),
)
self.cell_list = nn.ModuleList(cell_list)
def forward(self, input_tensor, hidden_state=None):
"""
Parameters
----------
input_tensor: todo
5-D Tensor either of shape (t, b, c) or (b, t, c)
hidden_state: todo
None. todo implement stateful
Returns
-------
last_state_list, layer_output
"""
if not self.batch_first:
# 等同于transpose
# (t, b, c, h, w) -> (b, t, c, h, w)
input_tensor = input_tensor.permute(1, 0, 2)
b, _, _ = input_tensor.size()
# Implement stateful ConvLSTM
if hidden_state is not None:
raise NotImplementedError()
else:
# Since the init is done in forward. Can send image size here
hidden_state = self._init_hidden(batch_size=b)
layer_output_list = []
last_state_list = []
timestamp = input_tensor.size(1)
cur_layer_input = input_tensor
for layer_idx in range(self.num_layers):
h, c = hidden_state[layer_idx]
output_inner = []
for t in range(timestamp):
h, c = self.cell_list[layer_idx](input_tensor=cur_layer_input[:, t, :],
cur_state=[h, c])
output_inner.append(h)
layer_output = torch.stack(output_inner, dim=1)
# TODO 每层之间增加一个dct_attention
# layer_output = self.attention_list[layer_idx](layer_output)
cur_layer_input = layer_output
layer_output_list.append(layer_output)
last_state_list.append([h, c])
if not self.return_all_layers:
layer_output_list = layer_output_list[-1:]
last_state_list = last_state_list[-1:]
return layer_output_list, last_state_list
def _init_hidden(self, batch_size):
init_states = []
for i in range(self.num_layers):
init_states.append(self.cell_list[i].init_hidden(batch_size))
return init_states
@staticmethod
def _extend_for_multilayer(param, num_layers):
if not isinstance(param, list):
param = [param] * num_layers
return param
class AdaRNN(nn.Module):
"""
model_type: 'Boosting', 'AdaRNN'
bottleneck_list: (dim,is_BatchNorm,is_ReLu,drop_out)
"""
def __init__(self, use_bottleneck=False, bottleneck_list=[(64, False, False, 0), (64, True, True, 0.5)],
n_input=128, n_hiddens=[64, 64], n_output=6,
dropout=0.0, len_seq=9, model_type='AdaRNN',
trans_loss='mmd'):
super(AdaRNN, self).__init__()
self.use_bottleneck = use_bottleneck
self.n_input = n_input
self.num_layers = len(n_hiddens)
self.hiddens = n_hiddens
self.n_output = n_output
self.model_type = model_type
self.trans_loss = trans_loss
self.len_seq = len_seq
in_size = self.n_input
features = nn.ModuleList()
# dctAttention = nn.ModuleList()
for hidden in n_hiddens:
# rnn = nn.GRU(
# input_size=in_size,
# num_layers=1,
# hidden_size=hidden,
# batch_first=True,
# dropout=dropout
# )
rnn = LSTM(input_dim=in_size, hidden_dim=[hidden], num_layers=1, batch_first=True, return_all_layers=True)
# attention = dct_channel_block(channel=hidden)
features.append(rnn)
# dctAttention.append(attention)
in_size = hidden
self.features = nn.Sequential(*features)
# self.dctAttention = nn.Sequential(*dctAttention)
if use_bottleneck == True: # finance
bottleneck = []
for i in range(len(bottleneck_list)):
cur_input_dim = self.hiddens[-1] if i == 0 else bottleneck_list[i - 1][0]
bottleneck.append(
nn.Linear(cur_input_dim, bottleneck_list[i][0])
)
### 不加初始权重会让Hard predict更不稳定振幅更大
# 初始权重越大,振幅越大
bottleneck[-1].weight.data.normal_(0, 0.03)
bottleneck[-1].bias.data.fill_(0.01)
if bottleneck_list[i][1]:
bottleneck.append(nn.BatchNorm1d(bottleneck_list[i][0]))
if bottleneck_list[i][2]:
bottleneck.append(nn.ReLU())
if bottleneck_list[i][3] != 0:
bottleneck.append(nn.Dropout(bottleneck_list[i][3]))
self.bottleneck = nn.Sequential(*bottleneck)
self.fc = nn.Linear(bottleneck_list[-1][0], n_output)
torch.nn.init.xavier_normal_(self.fc.weight)
else:
self.fc_out = nn.Linear(n_hiddens[-1], self.n_output)
if self.model_type == 'AdaRNN':
gate = nn.ModuleList()
for i in range(len(n_hiddens)):
gate_weight = nn.Linear(
len_seq * self.hiddens[i] * 2, len_seq)
gate.append(gate_weight)
self.gate = gate
bnlst = nn.ModuleList()
for i in range(len(n_hiddens)):
bnlst.append(nn.BatchNorm1d(len_seq))
self.bn_lst = bnlst
self.softmax = torch.nn.Softmax(dim=0)
self.init_layers()
def init_layers(self):
for i in range(len(self.hiddens)):
self.gate[i].weight.data.normal_(0, 0.05)
self.gate[i].bias.data.fill_(0.0)
def forward_pre_train(self, x, len_win=0):
out = self.gru_features(x)
# 两层GRU之后的结果
fea = out[0]
if self.use_bottleneck == True:
fea_bottleneck = self.bottleneck(fea[:, -1, :])
fc_out = self.fc(fea_bottleneck).squeeze()
else:
fc_out = self.fc_out(fea[:, -1, :]).squeeze()
# 每层GRU之后的结果,每层GRU前后权重归一化之后的结果
out_list_all, out_weight_list = out[1], out[2]
# 可以理解为前半段 和 后半段
out_list_s, out_list_t = self.get_features(out_list_all)
loss_transfer = torch.zeros((1,))
for i in range(len(out_list_s)):
criterion_transder = TransferLoss(
loss_type=self.trans_loss, input_dim=out_list_s[i].shape[2])
h_start = 0
for j in range(h_start, self.len_seq, 1):
i_start = max(j - len_win, 0)
i_end = j + len_win if j + len_win < self.len_seq else self.len_seq - 1
for k in range(i_start, i_end + 1):
weight = out_weight_list[i][j] if self.model_type == 'AdaRNN' else 1 / (
self.len_seq - h_start) * (2 * len_win + 1)
loss_transfer = loss_transfer + weight * criterion_transder.compute(
out_list_s[i][:, j, :], out_list_t[i][:, k, :])
return fc_out, loss_transfer, out_weight_list
def gru_features(self, x, predict=False):
x_input = x
out = None
out_lis = []
out_weight_list = [] if (
self.model_type == 'AdaRNN') else None
for i in range(self.num_layers):
# GRU的输出
out, _ = self.features[i](x_input.float())
out = out[0]
# out = self.dctAttention[i](out.float())
x_input = out
out_lis.append(out)
if self.model_type == 'AdaRNN' and predict == False:
out_gate = self.process_gate_weight(x_input, i)
out_weight_list.append(out_gate)
# 两层GRU之后的结果,每层GRU之后的结果,每层GRU前后权重归一化之后的结果
return out, out_lis, out_weight_list
def process_gate_weight(self, out, index):
x_s = out[0: int(out.shape[0] // 2)] # 可以理解为前一半个batch_size的分布 域Di
x_t = out[out.shape[0] // 2: out.shape[0]] # 可以理解为后一半个batch_size的分布 域Dj
# 对应着不同的域
x_all = torch.cat((x_s, x_t), 2)
x_all = x_all.view(x_all.shape[0], -1)
weight = torch.sigmoid(self.bn_lst[index](
self.gate[index](x_all.float())))
weight = torch.mean(weight, dim=0)
res = self.softmax(weight).squeeze()
return res
def get_features(self, output_list):
fea_list_src, fea_list_tar = [], []
for fea in output_list:
fea_list_src.append(fea[0: fea.size(0) // 2])
fea_list_tar.append(fea[fea.size(0) // 2:])
return fea_list_src, fea_list_tar
# For Boosting-based
def forward_Boosting(self, x, weight_mat=None):
out = self.gru_features(x)
fea = out[0]
if self.use_bottleneck:
fea_bottleneck = self.bottleneck(fea[:, -1, :])
fc_out = self.fc(fea_bottleneck).squeeze()
else:
fc_out = self.fc_out(fea[:, -1, :]).squeeze()
out_list_all = out[1]
# 可以理解为前半段和后半段
out_list_s, out_list_t = self.get_features(out_list_all)
loss_transfer = torch.zeros((1,))
if weight_mat is None:
weight = (1.0 / self.len_seq *
torch.ones(self.num_layers, self.len_seq))
else:
weight = weight_mat
dist_mat = torch.zeros(self.num_layers, self.len_seq)
for i in range(len(out_list_s)):
criterion_transder = TransferLoss(
loss_type=self.trans_loss, input_dim=out_list_s[i].shape[2])
for j in range(self.len_seq):
loss_trans = criterion_transder.compute(
out_list_s[i][:, j, :], out_list_t[i][:, j, :])
loss_transfer = loss_transfer + weight[i, j] * loss_trans
dist_mat[i, j] = loss_trans
return fc_out, loss_transfer, dist_mat, weight
# For Boosting-based
def update_weight_Boosting(self, weight_mat, dist_old, dist_new):
epsilon = 1e-12
dist_old = dist_old.detach()
dist_new = dist_new.detach()
ind = dist_new > dist_old + epsilon
weight_mat[ind] = weight_mat[ind] * \
(1 + torch.sigmoid(dist_new[ind] - dist_old[ind]))
weight_norm = torch.norm(weight_mat, dim=1, p=1)
weight_mat = weight_mat / weight_norm.t().unsqueeze(1).repeat(1, self.len_seq)
return weight_mat
def predict(self, x):
out = self.gru_features(x, predict=True)
fea = out[0]
if self.use_bottleneck:
fea_bottleneck = self.bottleneck(fea[:, -1, :])
fc_out = self.fc(fea_bottleneck).squeeze()
else:
fc_out = self.fc_out(fea[:, -1, :]).squeeze()
return fc_out