小论文

This commit is contained in:
kevinding1125 2023-11-09 19:45:21 +08:00
parent d8746d192f
commit 564ba3f669
14 changed files with 1050 additions and 64 deletions

View File

@ -0,0 +1,229 @@
# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/6/14 14:56
@Usage :
@Desc : 测试所实现的LSTM
'''
import tensorflow as tf
import numpy as np
from model.LSTM.DCTAttention_embed_LSTM import AttentionEmbedLSTMLayer as LSTMLayer
# from model.LSTM.LSTM import LSTMLayer as LSTMLayer
import matplotlib.pyplot as plt
from keras.callbacks import EarlyStopping
from model.LossFunction.FTMSE import FTMSE
import math
from sklearn.metrics import mean_absolute_error, mean_squared_error
from pylab import *
'''
超参数设置:
'''
hidden_num = 10 # LSTM细胞个数
feature = 10 # 一个点的维度
batch_size = 32
EPOCH = 1000
unit = 512 # LSTM的维度
predict_num = 50 # 预测个数
model_name = "dctLSTM"
save_name = r"self_{0}_hidden{1}_unit{2}_feature{3}_predict{4}.h5".format(model_name, hidden_num, unit, feature,
predict_num)
def getData(filter_num, dims):
# 数据读入
HI_merge_data_origin = np.load("../../2012轴承数据集预测挑战/HI_create/HI_merge_data.npy")
# plt.plot(HI_merge_data[0:1250, 1])
# 去除掉退化特征不明显前面的点
HI_merge_data = HI_merge_data_origin[0:1250, 1]
# plt.plot(HI_merge_data)
# plt.show()
(total_dims,) = HI_merge_data.shape
# # 将其分成重叠采样状态-滑动窗口函数
predict_data = np.empty(shape=[total_dims - filter_num, filter_num])
# 重叠采样获取时间部和训练次数
for dim in range(total_dims - filter_num):
predict_data[dim] = HI_merge_data[dim:dim + filter_num]
train_label = predict_data[dims:, :]
train_label_single = HI_merge_data[dims + filter_num - 1:-1]
# 再重叠采样获取一个点的维度
'''train_data.shape:(sample,filter_num) -> (sample,filter_num,dims)'''
# # 将其分成重叠采样状态-滑动窗口函数
train_data = np.empty(shape=[dims, total_dims - filter_num - dims, filter_num])
for dim in range(dims):
train_data[dim] = predict_data[dim:total_dims - filter_num - dims + dim, :]
# 转置变成想要的数据 (dims,sample,filter_num) -> (sample,filter_num,dims)
train_data = tf.transpose(train_data, [1, 2, 0])
# todo 解决模型保存时,query无法序列化的问题
total_data = tf.cast(HI_merge_data, dtype=tf.float32)
train_data = tf.cast(train_data, dtype=tf.float32)
train_label = tf.cast(train_label, dtype=tf.float32)
train_label_single = tf.cast(train_label_single, dtype=tf.float32)
print("total_data.shape:", total_data.shape)
print("train_data.shape:", train_data.shape) # (20, 1200, 30)
print("train_label.shape:", train_label.shape) # (20, 1200)
print("train_label_single.shape:", train_label_single.shape)
# 所有的原始数据;所有的训练数据;所有的训练标签(预测一个序列);所有的训练标签(预测一个点)
return total_data, train_data, train_label, train_label_single
'''
train_data.shape: (total_dims - filter_num - 1, filter_num,dims) :(570,600,30)
predict_data.shape: (total_dims - filter_num, filter_num) :(571,600,30)
train_label.shape: (total_dims - filter_num - 1, filter_num) :(570,600)
'''
def remove(train_data, train_label, batch_size):
epoch, _, _ = train_data.shape
size = int(epoch / batch_size)
return train_data[:size * batch_size], train_label[:size * batch_size]
'''
train_data.shape: (1230, 10, 10)
train_label.shape: (1230, 10)
train_label_single.shape: (1230,)
'''
def splitValData(data, label, label_single, predict_num=50):
sample, hidden, feature = data.shape
train_data = data[:sample - predict_num, :, :]
val_data = data[sample - predict_num:, :, :]
train_label = label[:sample - predict_num, :]
val_label = label[sample - predict_num:, :]
train_label_single = label_single[:sample - predict_num, ]
val_label_single = label_single[sample - predict_num:, ]
return train_data, val_data, train_label, val_label, train_label_single, val_label_single
def predict_model(filter_num, dims):
input = tf.keras.Input(shape=[filter_num, dims])
input = tf.cast(input, tf.float32)
#### 官方
# LSTM = tf.keras.layers.LSTM(units=512, return_sequences=True)(input)
# LSTM = tf.keras.layers.LSTM(units=256, return_sequences=False)(LSTM)
#### 自己
# LSTM = tf.keras.layers.Conv1D(512, kernel_size=8, padding='same')(input)
LSTM = LSTMLayer(units=512, return_sequences=True)(input)
LSTM = LSTMLayer(units=256, return_sequences=False)(LSTM)
x = tf.keras.layers.Dense(128, activation="relu")(LSTM)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dense(32, activation="relu")(x)
x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dense(16, activation="relu")(x)
output = tf.keras.layers.Dense(1, activation="relu", name='output')(x)
model = tf.keras.Model(inputs=input, outputs=output)
return model
def split_data(train_data, train_label):
return train_data[:1150, :, :], train_label[:1150, :], train_data[-70:, :, :], train_label[-70:, :]
# 仅使用预测出来的最新的一个点预测以后
def predictOneByOne(newModel, train_data, predict_num=50):
# 取出训练数据的最后一条
each_predict_data = np.expand_dims(train_data[-1, :, :], axis=0)
predicted_list = np.empty(shape=(predict_num, 1)) # (5,filter_num,30)
# all_data = total_data # (1201,)
for each_predict in range(predict_num):
# predicted_data.shape : (1,1)
predicted_data = newModel.predict(each_predict_data) # (batch_size,filer_num,1)
predicted_list[each_predict] = predicted_data
# (1,1) => (10,1)
temp1 = np.transpose(np.concatenate([each_predict_data[:, 1:, -1], predicted_data], axis=1), [1, 0])
each_predict_data = np.expand_dims(
np.concatenate([np.squeeze(each_predict_data[:, :, 1:], axis=0), temp1], axis=1), axis=0)
return predicted_list
# 不使用预测的数据,直接使用已知的数据持续预测
def predictByEveryData(trained_model: tf.keras.Model, predict_data):
predicted_data = trained_model.predict(predict_data)
predicted_data = np.concatenate([np.expand_dims(total_data[:hidden_num + feature, ], axis=1), predicted_data],
axis=0)
data = predictOneByOne(trained_model, predict_data)
predicted_data = np.concatenate([predicted_data, data], axis=0)
return predicted_data
pass
if __name__ == '__main__':
# 数据读取
# 数据读入 --> 所有的原始数据;所有的训练数据;所有的训练标签(预测一个序列);所有的训练标签(预测一个点)
total_data, train_data, train_label, train_label_single = getData(hidden_num, feature)
# 根据预测的点数划分训练集和测试集(验证集)
train_data, val_data, train_label, val_label, train_label_single, val_label_single = splitValData(train_data,
train_label,
train_label_single,
predict_num=predict_num)
# # # #### TODO 训练
model = predict_model(hidden_num, feature)
checkpoint = tf.keras.callbacks.ModelCheckpoint(
filepath=save_name,
monitor='val_loss',
verbose=2,
save_best_only=True,
mode='min')
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=0.001)
model.compile(optimizer=tf.optimizers.SGD(), loss=tf.losses.mse)
model.summary()
early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=20, mode='min', verbose=1)
history = model.fit(train_data, train_label_single, epochs=EPOCH, validation_data=(val_data, val_label_single),
shuffle=True, verbose=1,
callbacks=[checkpoint, lr_scheduler, early_stop])
#### TODO 测试
trained_model = tf.keras.models.load_model(save_name, custom_objects={'AttentionEmbedLSTMLayer': LSTMLayer})
# 使用已知的点进行预测
print("开始预测")
predicted_data = predictByEveryData(trained_model, train_data)
# 使用预测的点持续预测
# predicted_data = predictOneByOne(trained_model, total_data, train_data)
print("predicted_data:", predicted_data)
print("predicted_data.shape:", predicted_data.shape)
plt.figure(1)
plt.subplot(2, 1, 1)
plt.plot(total_data)
# plt.subplot(2, 1, 2)
plt.plot(predicted_data)
# plt.scatter()
plt.show()

View File

@ -2,7 +2,11 @@
# 线性循环单元Linear Recurrent Unit # 线性循环单元Linear Recurrent Unit
# tensorflow 1.15 + bert4keras 0.11.4 测试通过 # tensorflow 1.15 + bert4keras 0.11.4 测试通过
from bert4keras.layers import *
from tensorflow.keras.layers import Layer,Dense
import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as K
class LRU(Layer): class LRU(Layer):
@ -26,9 +30,9 @@ class LRU(Layer):
self.unroll = unroll self.unroll = unroll
self.kernel_initializer = initializers.get(kernel_initializer) self.kernel_initializer = initializers.get(kernel_initializer)
@integerize_shape
def build(self, input_shape): def build(self, input_shape):
super(LRU, self).build(input_shape)
hidden_size = input_shape[-1] hidden_size = input_shape[-1]
self.i_dense = Dense( self.i_dense = Dense(
units=self.units * 2, units=self.units * 2,
@ -57,7 +61,7 @@ class LRU(Layer):
name='params_log', shape=(3, self.units), initializer=initializer name='params_log', shape=(3, self.units), initializer=initializer
) )
@recompute_grad
def call(self, inputs, mask=None): def call(self, inputs, mask=None):
u = self.i_dense(inputs) u = self.i_dense(inputs)
params = K.exp(self.params_log) params = K.exp(self.params_log)

View File

@ -0,0 +1,239 @@
# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/6/14 14:56
@Usage :
@Desc : 测试所实现的LSTM
'''
import tensorflow as tf
import numpy as np
from model.AdamRNN.AdamRNN import AdaRNN
# from model.LSTM.LSTM import LSTMLayer as LSTMLayer
import matplotlib.pyplot as plt
from keras.callbacks import EarlyStopping
from model.LossFunction.FTMSE import FTMSE
import math
from sklearn.metrics import mean_absolute_error, mean_squared_error
from pylab import *
'''
超参数设置:
'''
hidden_num = 40 # LSTM细胞个数
feature = 10 # 一个点的维度
batch_size = 32
EPOCH = 1000
unit = 512 # LSTM的维度
predict_num = 50 # 预测个数
model_name = "adaRNN"
save_name = r"self_{0}_hidden{1}_unit{2}_feature{3}_predict{4}.h5".format(model_name, hidden_num, unit, feature,
predict_num)
def standardization(data):
mu = np.mean(data, axis=0)
sigma = np.std(data, axis=0)
return (data - mu) / sigma
def normalization(data):
_range = np.max(data) - np.min(data)
return (data - np.min(data)) / _range
# LSTM_cell的数目,维度,是否正则化
def getData(filter_num, dims, if_norm: bool = False):
# 数据读入
HI_merge_data_origin = np.load("../../2012轴承数据集预测挑战/HI_create/HI_merge_data.npy")
# plt.plot(HI_merge_data[0:1250, 1])
# 去除掉退化特征不明显前面的点
HI_merge_data = HI_merge_data_origin[0:1250, 1]
# 是否正则化
if if_norm:
HI_merge_data = normalization(HI_merge_data)
# plt.plot(HI_merge_data)
# plt.show()
(total_dims,) = HI_merge_data.shape
# # 将其分成重叠采样状态-滑动窗口函数
predict_data = np.empty(shape=[total_dims - filter_num, filter_num])
# 重叠采样获取时间部和训练次数
for dim in range(total_dims - filter_num):
predict_data[dim] = HI_merge_data[dim:dim + filter_num]
train_label = predict_data[dims:, :]
train_label_single = HI_merge_data[dims + filter_num - 1:-1]
# 再重叠采样获取一个点的维度
'''train_data.shape:(sample,filter_num) -> (sample,filter_num,dims)'''
# # 将其分成重叠采样状态-滑动窗口函数
train_data = np.empty(shape=[dims, total_dims - filter_num - dims, filter_num])
for dim in range(dims):
train_data[dim] = predict_data[dim:total_dims - filter_num - dims + dim, :]
# 转置变成想要的数据 (dims,sample,filter_num) -> (sample,filter_num,dims)
train_data = tf.transpose(train_data, [1, 2, 0])
# todo 解决模型保存时,query无法序列化的问题
total_data = tf.cast(HI_merge_data, dtype=tf.float32)
train_data = tf.cast(train_data, dtype=tf.float32)
train_label = tf.cast(train_label, dtype=tf.float32)
train_label_single = tf.cast(train_label_single, dtype=tf.float32)
print("total_data.shape:", total_data.shape)
print("train_data.shape:", train_data.shape) # (20, 1200, 30)
print("train_label.shape:", train_label.shape) # (20, 1200)
print("train_label_single.shape:", train_label_single.shape)
# 所有的原始数据;所有的训练数据;所有的训练标签(预测一个序列);所有的训练标签(预测一个点)
return total_data, train_data, train_label, train_label_single
'''
train_data.shape: (total_dims - filter_num - 1, filter_num,dims) :(570,600,30)
predict_data.shape: (total_dims - filter_num, filter_num) :(571,600,30)
train_label.shape: (total_dims - filter_num - 1, filter_num) :(570,600)
'''
def remove(train_data, train_label, batch_size):
epoch, _, _ = train_data.shape
size = int(epoch / batch_size)
return train_data[:size * batch_size], train_label[:size * batch_size]
'''
train_data.shape: (1230, 10, 10)
train_label.shape: (1230, 10)
train_label_single.shape: (1230,)
'''
def splitValData(data, label, label_single, predict_num=50):
sample, hidden, feature = data.shape
train_data = data[:sample - predict_num, :, :]
val_data = data[sample - predict_num:, :, :]
train_label = label[:sample - predict_num, :]
val_label = label[sample - predict_num:, :]
train_label_single = label_single[:sample - predict_num, ]
val_label_single = label_single[sample - predict_num:, ]
return train_data, val_data, train_label, val_label, train_label_single, val_label_single
def predict_model(filter_num, dims):
input = tf.keras.Input(shape=[filter_num, dims])
input = tf.cast(input, tf.float32)
x = tf.keras.layers.Dense(128, activation="relu")(LSTM)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dense(32, activation="relu")(x)
x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dense(16, activation="relu")(x)
output = tf.keras.layers.Dense(1, activation="relu", name='output')(x)
model = tf.keras.Model(inputs=input, outputs=output)
return model
def split_data(train_data, train_label):
return train_data[:1150, :, :], train_label[:1150, :], train_data[-70:, :, :], train_label[-70:, :]
# 仅使用预测出来的最新的一个点预测以后
def predictOneByOne(newModel, train_data, predict_num=50):
# 取出训练数据的最后一条
each_predict_data = np.expand_dims(train_data[-1, :, :], axis=0)
predicted_list = np.empty(shape=(predict_num, 1)) # (5,filter_num,30)
# all_data = total_data # (1201,)
for each_predict in range(predict_num):
# predicted_data.shape : (1,1)
predicted_data = newModel.predict(each_predict_data) # (batch_size,filer_num,1)
predicted_list[each_predict] = predicted_data
# (1,1) => (10,1)
temp1 = np.transpose(np.concatenate([each_predict_data[:, 1:, -1], predicted_data], axis=1), [1, 0])
each_predict_data = np.expand_dims(
np.concatenate([np.squeeze(each_predict_data[:, :, 1:], axis=0), temp1], axis=1), axis=0)
return predicted_list
# 不使用预测的数据,直接使用已知的数据持续预测
def predictByEveryData(trained_model: tf.keras.Model, predict_data):
predicted_data = trained_model.predict(predict_data)
predicted_data = np.concatenate([np.expand_dims(total_data[:hidden_num + feature, ], axis=1), predicted_data],
axis=0)
data = predictOneByOne(trained_model, predict_data)
predicted_data = np.concatenate([predicted_data, data], axis=0)
return predicted_data
pass
if __name__ == '__main__':
# 数据读取
# 数据读入 --> 所有的原始数据;所有的训练数据;所有的训练标签(预测一个序列);所有的训练标签(预测一个点)
total_data, train_data, train_label, train_label_single = getData(hidden_num, feature)
# 根据预测的点数划分训练集和测试集(验证集)
train_data, val_data, train_label, val_label, train_label_single, val_label_single = splitValData(train_data,
train_label,
train_label_single,
predict_num=predict_num)
# # # #### TODO 训练
model = predict_model(hidden_num, feature)
checkpoint = tf.keras.callbacks.ModelCheckpoint(
filepath=save_name,
monitor='val_loss',
verbose=2,
save_best_only=True,
mode='min')
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=0.0001)
model.compile(optimizer=tf.optimizers.SGD(), loss=tf.losses.mse)
model.summary()
early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=20, mode='min', verbose=1)
history = model.fit(train_data, train_label_single, epochs=EPOCH, validation_data=(val_data, val_label_single),
shuffle=True, verbose=1,
callbacks=[checkpoint, lr_scheduler, early_stop])
#### TODO 测试
trained_model = tf.keras.models.load_model(save_name, custom_objects={'AttentionEmbedLSTMLayer': LSTMLayer})
# 使用已知的点进行预测
print("开始预测")
predicted_data = predictByEveryData(trained_model, train_data)
# 使用预测的点持续预测
# predicted_data = predictOneByOne(trained_model, total_data, train_data)
print("predicted_data:", predicted_data)
print("predicted_data.shape:", predicted_data.shape)
plt.figure(1)
plt.subplot(2, 1, 1)
plt.plot(total_data)
# plt.subplot(2, 1, 2)
plt.plot(predicted_data)
# plt.scatter()
plt.show()

View File

@ -9,13 +9,13 @@
import tensorflow as tf import tensorflow as tf
import numpy as np import numpy as np
from model.LSTM.LSTMByDense import LSTMLayer as LSTMLayer
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from keras.callbacks import EarlyStopping from keras.callbacks import EarlyStopping
from model.LossFunction.FTMSE import FTMSE from model.LSTM.DCTAttention_embed_LSTM import AttentionEmbedLSTMLayer as LSTMLayer
# from model.LSTM.LSTM import LSTMLayer as LSTMLayer
from model.ChannelAttention.DCT_channelAttention import DCTChannelAttention from model.ChannelAttention.DCT_channelAttention import DCTChannelAttention
from model.ChannelAttention.Light_channelAttention import LightChannelAttention1 as LightChannelAttention
import math import math
from sklearn.metrics import mean_absolute_error, mean_squared_error from sklearn.metrics import mean_absolute_error, mean_squared_error
from pylab import * from pylab import *
@ -29,7 +29,7 @@ batch_size = 8
EPOCH = 1000 EPOCH = 1000
unit = 512 # LSTM的维度 unit = 512 # LSTM的维度
predict_num = 50 # 预测个数 predict_num = 50 # 预测个数
model_name = "FC_FTLSTM" model_name = "dctLSTM"
save_name = r"selfMulti_{0}_hidden{1}_unit{2}_feature{3}_predict{4}.h5".format(model_name, hidden_num, unit, save_name = r"selfMulti_{0}_hidden{1}_unit{2}_feature{3}_predict{4}.h5".format(model_name, hidden_num, unit,
feature, feature,
predict_num) predict_num)
@ -100,22 +100,13 @@ def getData(filter_num, dims, if_norm: bool = False):
return total_data, train_data, train_label, train_label_single return total_data, train_data, train_label, train_label_single
'''
train_data.shape: (total_dims - filter_num - 1, filter_num,dims) :(570,600,30)
predict_data.shape: (total_dims - filter_num, filter_num) :(571,600,30)
train_label.shape: (total_dims - filter_num - 1, filter_num) :(570,600)
'''
def remove(train_data, train_label, batch_size):
epoch, _, _ = train_data.shape
size = int(epoch / batch_size)
return train_data[:size * batch_size], train_label[:size * batch_size]
''' '''
train_data.shape: (1230, 10, 10) train_data.shape: (1230, 10, 10)
train_label.shape: (1230, 10) train_label.shape: (1230, 10)
train_label_single.shape: (1230,) train_label_single.shape: (1230,)
''' '''
def splitValData(data, label, label_single, predict_num=50): def splitValData(data, label, label_single, predict_num=50):
sample, hidden, feature = data.shape sample, hidden, feature = data.shape
@ -145,19 +136,16 @@ def predict_model_multi(filter_num, dims):
LSTM = LSTMLayer(units=512, return_sequences=True)(input) LSTM = LSTMLayer(units=512, return_sequences=True)(input)
# LSTM = LightChannelAttention()(LSTM) # LSTM = LightChannelAttention()(LSTM)
LSTM = LSTMLayer(units=256, return_sequences=True)(LSTM) LSTM = LSTMLayer(units=256, return_sequences=True)(LSTM)
LSTM = LightChannelAttention()(LSTM)
### flatten x = tf.keras.layers.Dense(128, activation="relu")(LSTM)
x = tf.keras.layers.Flatten()(LSTM)
x = tf.keras.layers.Dense(128, activation="relu")(x)
x = tf.keras.layers.Dense(64, activation="relu")(x) x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.2)(x) x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.BatchNormalization()(x) x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dense(32, activation="relu")(x) x = tf.keras.layers.Dense(32, activation="relu")(x)
x = tf.keras.layers.Dropout(0.2)(x) x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.BatchNormalization()(x) x = tf.keras.layers.BatchNormalization()(x)
# x = tf.keras.layers.Dense(16, activation="relu")(x) x = tf.keras.layers.Dense(16, activation="relu")(x)
output = tf.keras.layers.Dense(10, activation="relu", name='output')(x) output = tf.keras.layers.Dense(1, activation="relu", name='output')(x)
model = tf.keras.Model(inputs=input, outputs=output) model = tf.keras.Model(inputs=input, outputs=output)
return model return model
@ -207,31 +195,31 @@ if __name__ == '__main__':
train_label_single, train_label_single,
predict_num=predict_num) predict_num=predict_num)
# # #### TODO 训练 # # #### TODO 训练
# model = predict_model_multi(hidden_num, feature) model = predict_model_multi(hidden_num, feature)
# checkpoint = tf.keras.callbacks.ModelCheckpoint( checkpoint = tf.keras.callbacks.ModelCheckpoint(
# filepath=save_name, filepath=save_name,
# monitor='val_loss', monitor='val_loss',
# verbose=2, verbose=2,
# save_best_only=True, save_best_only=True,
# mode='min') mode='min')
# lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=20, min_lr=0.001) lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=0.001)
#
# model.compile(optimizer=tf.optimizers.SGD(), loss=tf.losses.mse) model.compile(optimizer=tf.optimizers.SGD(), loss=tf.losses.mse)
# # model.compile(optimizer=tf.optimizers.SGD(learning_rate=0.001), loss=FTMSE()) # model.compile(optimizer=tf.optimizers.SGD(learning_rate=0.001), loss=FTMSE())
# model.summary() model.summary()
# early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=100, mode='min', verbose=1) early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=20, mode='min', verbose=1)
#
# history = model.fit(train_data, train_label, epochs=EPOCH, history = model.fit(train_data, train_label, epochs=EPOCH, validation_data=(val_data, val_label),
# batch_size=batch_size, validation_data=(val_data, val_label_single), shuffle=True, verbose=2, shuffle=True, verbose=1,
# callbacks=[checkpoint, lr_scheduler, early_stop]) callbacks=[checkpoint, lr_scheduler, early_stop])
#### TODO 测试 #### TODO 测试
# trained_model = tf.keras.models.load_model(save_name, custom_objects={'LSTMLayer': LSTMLayer, 'FTMSE': FTMSE}) # trained_model = tf.keras.models.load_model(save_name, custom_objects={'LSTMLayer': LSTMLayer, 'FTMSE': FTMSE})
# todo 解决自定义loss无法导入的问题 # todo 解决自定义loss无法导入的问题
trained_model = tf.keras.models.load_model(save_name, compile=False, custom_objects={'LSTMLayer': LSTMLayer,'LightChannelAttention1':LightChannelAttention}) trained_model = tf.keras.models.load_model(save_name, compile=False, custom_objects={'AttentionEmbedLSTMLayer': LSTMLayer})
trained_model.compile(optimizer=tf.optimizers.SGD(), loss=FTMSE()) # trained_model.compile(optimizer=tf.optimizers.SGD(), loss=FTMSE())
# 使用已知的点进行预测 # 使用已知的点进行预测
predicted_data = predictByEveryData(trained_model, train_data) predicted_data = predictByEveryData(trained_model, train_data)

View File

@ -9,7 +9,8 @@
import tensorflow as tf import tensorflow as tf
import numpy as np import numpy as np
from model.LSTM.LSTM import LSTMLayer as LSTMLayer from model.LSTM.DCTAttention_embed_LSTM import AttentionEmbedLSTMLayer as LSTMLayer
# from model.LSTM.LSTM import LSTMLayer as LSTMLayer
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from keras.callbacks import EarlyStopping from keras.callbacks import EarlyStopping
@ -21,18 +22,30 @@ from pylab import *
''' '''
超参数设置: 超参数设置:
''' '''
hidden_num = 10 # LSTM细胞个数 hidden_num = 40 # LSTM细胞个数
feature = 10 # 一个点的维度 feature = 10 # 一个点的维度
batch_size = 32 batch_size = 32
EPOCH = 1000 EPOCH = 1000
unit = 512 # LSTM的维度 unit = 512 # LSTM的维度
predict_num = 50 # 预测个数 predict_num = 50 # 预测个数
model_name = "cnn_LSTM" model_name = "dctLSTM"
save_name = r"self_{0}_hidden{1}_unit{2}_feature{3}_predict{4}.h5".format(model_name, hidden_num, unit, feature, save_name = r"self_{0}_hidden{1}_unit{2}_feature{3}_predict{4}.h5".format(model_name, hidden_num, unit, feature,
predict_num) predict_num)
def getData(filter_num, dims): def standardization(data):
mu = np.mean(data, axis=0)
sigma = np.std(data, axis=0)
return (data - mu) / sigma
def normalization(data):
_range = np.max(data) - np.min(data)
return (data - np.min(data)) / _range
# LSTM_cell的数目,维度,是否正则化
def getData(filter_num, dims, if_norm: bool = False):
# 数据读入 # 数据读入
HI_merge_data_origin = np.load("../../2012轴承数据集预测挑战/HI_create/HI_merge_data.npy") HI_merge_data_origin = np.load("../../2012轴承数据集预测挑战/HI_create/HI_merge_data.npy")
@ -40,6 +53,10 @@ def getData(filter_num, dims):
# 去除掉退化特征不明显前面的点 # 去除掉退化特征不明显前面的点
HI_merge_data = HI_merge_data_origin[0:1250, 1] HI_merge_data = HI_merge_data_origin[0:1250, 1]
# 是否正则化
if if_norm:
HI_merge_data = normalization(HI_merge_data)
# plt.plot(HI_merge_data) # plt.plot(HI_merge_data)
# plt.show() # plt.show()
(total_dims,) = HI_merge_data.shape (total_dims,) = HI_merge_data.shape
@ -125,8 +142,8 @@ def predict_model(filter_num, dims):
# LSTM = tf.keras.layers.LSTM(units=256, return_sequences=False)(LSTM) # LSTM = tf.keras.layers.LSTM(units=256, return_sequences=False)(LSTM)
#### 自己 #### 自己
LSTM = tf.keras.layers.Conv1D(512, kernel_size=8, padding='same')(input) # LSTM = tf.keras.layers.Conv1D(512, kernel_size=8, padding='same')(input)
LSTM = LSTMLayer(units=512, return_sequences=True)(LSTM) LSTM = LSTMLayer(units=512, return_sequences=True)(input)
LSTM = LSTMLayer(units=256, return_sequences=False)(LSTM) LSTM = LSTMLayer(units=256, return_sequences=False)(LSTM)
x = tf.keras.layers.Dense(128, activation="relu")(LSTM) x = tf.keras.layers.Dense(128, activation="relu")(LSTM)
@ -194,20 +211,23 @@ if __name__ == '__main__':
# verbose=2, # verbose=2,
# save_best_only=True, # save_best_only=True,
# mode='min') # mode='min')
# lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=20, min_lr=0.001) # lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=0.0001)
# #
# model.compile(optimizer=tf.optimizers.SGD(), loss=tf.losses.mse) # model.compile(optimizer=tf.optimizers.SGD(), loss=tf.losses.mse)
# model.summary()
# early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=100, mode='min', verbose=1)
# #
# history = model.fit(train_data, train_label_single, epochs=EPOCH, validation_data=(val_data, val_label_single), shuffle=True, verbose=1, # model.summary()
# early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=20, mode='min', verbose=1)
#
# history = model.fit(train_data, train_label_single, epochs=EPOCH, validation_data=(val_data, val_label_single),
# shuffle=True, verbose=1,
# callbacks=[checkpoint, lr_scheduler, early_stop]) # callbacks=[checkpoint, lr_scheduler, early_stop])
#### TODO 测试 #### TODO 测试
trained_model = tf.keras.models.load_model(save_name, custom_objects={'LSTMLayer': LSTMLayer}) trained_model = tf.keras.models.load_model(save_name, custom_objects={'AttentionEmbedLSTMLayer': LSTMLayer})
# 使用已知的点进行预测 # 使用已知的点进行预测
print("开始预测")
predicted_data = predictByEveryData(trained_model, train_data) predicted_data = predictByEveryData(trained_model, train_data)
# 使用预测的点持续预测 # 使用预测的点持续预测
# predicted_data = predictOneByOne(trained_model, total_data, train_data) # predicted_data = predictOneByOne(trained_model, total_data, train_data)

View File

@ -0,0 +1,18 @@
#-*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/9 10:26
@Usage :
@Desc :
'''
import tensorflow as tf
# 假设有两个形状为 (3,) 的张量
tensor1 = tf.constant([1, 2, 3])
tensor2 = tf.constant([4, 5, 6])
# 在新的轴上堆叠这两个张量
stacked_tensor = tf.stack([tensor1, tensor2],axis=-1)
print(stacked_tensor)

View File

@ -0,0 +1,149 @@
# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/9 16:42
@Usage :
@Desc :
'''
import tensorflow as tf
from model.LossFunction.TransferLoss import TransferLoss
class AdaRNN(tf.keras.Model):
def __init__(self, n_input=128, n_hiddens=[64, 64], n_output=6, len_seq=9, trans_loss='mmd'):
super(AdaRNN, self).__init__()
self.n_input = n_input
self.num_layers = len(n_hiddens)
self.hiddens = n_hiddens
self.n_output = n_output
self.trans_loss = trans_loss
self.len_seq = len_seq
self.features = tf.keras.Sequential()
for hidden in n_hiddens:
rnn = tf.keras.layers.GRU(
units=hidden,
return_sequences=True
)
self.features.add(rnn)
self.fc_out = tf.keras.layers.Dense(n_output, activation=None)
self.gate = []
for _ in range(len(n_hiddens)):
gate_weight = tf.keras.layers.Dense(len_seq, activation=None)
self.gate.append(gate_weight)
self.bn_lst = [tf.keras.layers.BatchNormalization() for _ in range(len(n_hiddens))]
self.softmax = tf.keras.layers.Softmax(axis=0)
# def init_layers(self):
# for gate_layer in self.gate:
# gate_layer.build((None, self.len_seq * self.hiddens[i] * 2))
def forward_pre_train(self, x, len_win=0):
# 两层GRU之后的结果,每层GRU之后的结果,每层GRU前后权重归一化之后的结果
out, out_list_all, out_weight_list = self.gru_features(x)
fea = out
fc_out = self.fc_out(fea[:, -1, :]).squeeze()
out_list_s, out_list_t = self.get_features(out_list_all)
loss_transfer = tf.zeros((1,))
for i in range(len(out_list_s)):
criterion_transder = TransferLoss(
loss_type=self.trans_loss)
h_start = 0
for j in range(h_start, self.len_seq, 1):
i_start = max(j - len_win, 0)
i_end = j + len_win if j + len_win < self.len_seq else self.len_seq - 1
for k in range(i_start, i_end + 1):
weight = out_weight_list[i][j]
loss_transfer = loss_transfer + weight * criterion_transder(
out_list_s[i][:, j, :], out_list_t[i][:, k, :])
return fc_out, loss_transfer, out_weight_list
def call(self, x, len_win=0, training=False):
# 两层GRU之后的结果,每层GRU之后的结果,每层GRU前后权重归一化之后的结果
out, out_list_all, out_weight_list = self.gru_features(x, training=training)
fea = out
fc_out = self.fc_out(fea[:, -1, :])
loss_transfer = tf.zeros((1,))
for i in range(len(out_list_all)):
criterion_transder = TransferLoss(
loss_type=self.trans_loss)
h_start = 0
for j in range(h_start, self.len_seq, 1):
i_start = max(j - len_win, 0)
i_end = j + len_win if j + len_win < self.len_seq else self.len_seq - 1
for k in range(i_start, i_end + 1):
weight = out_weight_list[i][j]
loss_transfer = loss_transfer + weight * criterion_transder.compute(
out_list_all[i][:, j, :], out_list_all[i][:, k, :])
return fc_out, loss_transfer, out_weight_list
def gru_features(self, x, training=False):
x_input = x
out = None
out_lis = []
out_weight_list = [] if (
self.model_type == 'AdaRNN') else None
for i in range(self.num_layers):
out = self.features[i](x_input, training=training)
x_input = out
out_lis.append(out)
if self.model_type == 'AdaRNN':
out_gate = self.process_gate_weight(x_input, i, training=training)
out_weight_list.append(out_gate)
return out, out_lis, out_weight_list
def process_gate_weight(self, out, index, training=False):
x_s = out[:, :out.shape[1] // 2] # 可以理解为LSTM的前半段
x_t = out[:, out.shape[1] // 2:] # 可以理解为LSTM的后半段
x_all = tf.concat((x_s, x_t), 2)
x_all = tf.reshape(x_all, (x_all.shape[0], -1))
weight = tf.sigmoid(self.bn_lst[index](self.gate[index](x_all)), training=training)
weight = tf.reduce_mean(weight, axis=0)
res = self.softmax(weight)
return res
def get_features(self, output_list):
fea_list_src, fea_list_tar = [], []
for fea in output_list:
fea_list_src.append(fea[:, :fea.shape[1] // 2])
fea_list_tar.append(fea[:, fea.shape[1] // 2:])
return fea_list_src, fea_list_tar
def forward_Boosting(self, x, weight_mat=None):
out, out_list_all, _ = self.gru_features(x, training=False)
fea = out
fc_out = self.fc_out(fea[:, -1, :])
out_list_all = out_list_all
out_list_s, out_list_t = self.get_features(out_list_all)
loss_transfer = tf.zeros((1,))
if weight_mat is None:
weight = (1.0 / self.len_seq *
tf.ones((self.num_layers, self.len_seq), dtype=tf.float32))
else:
weight = weight_mat
dist_mat = tf.zeros((self.num_layers, self.len_seq), dtype=tf.float32)
for i in range(len(out_list_s)):
criterion_transder = TransferLoss(
loss_type=self.trans_loss)
for j in range(self.len_seq):
loss_trans = criterion_transder(out_list_s[i][:, j, :], out_list_t[i][:, j, :])
loss_transfer = loss_transfer + weight[i, j] * loss_trans
dist_mat[i, j] = loss_trans
return fc_out, loss_transfer, dist_mat, weight
def update_weight_Boosting(self, weight_mat, dist_old, dist_new):
epsilon = 1e-12
dist_old = tf.stop_gradient(dist_old)
dist_new = tf.stop_gradient(dist_new)
ind = dist_new

View File

@ -0,0 +1,8 @@
#-*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/9 16:42
@Usage :
@Desc :
'''

View File

@ -16,7 +16,6 @@ import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense, Dropout, ReLU, BatchNormalization from tensorflow.keras.layers import Dense, Dropout, ReLU, BatchNormalization
from scipy.fftpack import dct from scipy.fftpack import dct
# def dct(x, norm=None): # def dct(x, norm=None):
# """ # """
# Discrete Cosine Transform, Type II (a.k.a. the DCT) # Discrete Cosine Transform, Type II (a.k.a. the DCT)
@ -55,16 +54,15 @@ from scipy.fftpack import dct
# return V # return V
import tensorflow as tf import tensorflow as tf
''' '''
参考 参考
[1] https://github.com/Zero-coder/FECAM/blob/main/layers/dctnet.py [1] https://github.com/Zero-coder/FECAM/blob/main/layers/dctnet.py
[2] https://arxiv.org/pdf/2212.01209v1.pdf [2] https://arxiv.org/pdf/2212.01209v1.pdf
''' '''
def sdct_tf(signals, frame_length, frame_step, window_fn=tf.signal.hamming_window): def sdct_tf(signals, frame_length, frame_step, window_fn=tf.signal.hamming_window):
"""Compute Short-Time Discrete Cosine Transform of `signals`. """Compute Short-Time Discrete Cosine Transform of `signals`.
@ -128,25 +126,34 @@ def isdct_tf(dcts, *, frame_step, frame_length=None, window_fn=tf.signal.hamming
signals = signals / window_signal signals = signals / window_signal
return signals return signals
class DCTChannelAttention(layers.Layer): class DCTChannelAttention(layers.Layer):
def build(self, input_shape): def build(self, input_shape):
_, hidden, channel = input_shape _, hidden, channel = input_shape
self.l1 = Dense(channel * 2, use_bias=False) self.l1 = Dense(channel * 2, use_bias=False)
self.drop1 = Dropout(0.1) self.drop1 = Dropout(0.1)
self.relu = ReLU(0.1) self.relu = ReLU()
self.l2 = Dense(channel, use_bias=False) self.l2 = Dense(channel, use_bias=False)
self.bn = BatchNormalization() self.bn = BatchNormalization(axis=-1, epsilon=1e-6)
def call(self, inputs, **kwargs): def call(self, inputs, **kwargs):
batch_size, hidden, channel = inputs.shape batch_size, hidden, channel = inputs.shape
list = [] list = []
stack_dct = tf.signal.dct(inputs, norm="ortho",axis=-1) change = tf.transpose(inputs, [0, 2, 1])
stack_dct = tf.signal.dct(change, norm="ortho", axis=-1)
stack_dct = tf.transpose(stack_dct, [0, 2, 1])
# for i in range(channel): # for i in range(channel):
# freq = tf.signal.dct(inputs[:, i, :], norm="ortho", axis=-1) # freq = tf.signal.dct(inputs[:, :, i], norm="ortho", axis=-1)
# # print("freq-shape:",freq.shape) # # print("freq-shape:",freq.shape)
# list.append(freq) # freq = tf.expand_dims(freq, axis=2)
# stack_dct = tf.stack(list, dim=1) # if i == 0:
# stack_dct = freq
# else:
# stack_dct = tf.concat([stack_dct, freq], axis=2)
# list.append(freq)
# stack_dct = tf.stack(list, axis=-1)
lr_weight = self.bn(stack_dct) lr_weight = self.bn(stack_dct)
lr_weight = self.l1(stack_dct) lr_weight = self.l1(stack_dct)

View File

@ -0,0 +1,80 @@
#-*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/8 19:39
@Usage :
@Desc :
'''
import tensorflow as tf
import numpy as np
# 定义DCT函数
def dct(x, norm=None):
"""
Discrete Cosine Transform, Type II (a.k.a. the DCT)
"""
x_shape = x.shape
N = x_shape[-1]
x = tf.reshape(x, [-1, N])
v = tf.concat([x[:, ::2], tf.reverse(x[:, 1::2], [1])], axis=1)
Vc = tf.signal.rfft(v, 1)
k = - tf.range(N, dtype=x.dtype) * np.pi / (2 * N)
W_r = tf.cos(k)
W_i = tf.sin(k)
V = Vc[:, :, 0] * W_r - Vc[:, :, 1] * W_i
if norm == 'ortho':
V[:, 0] /= np.sqrt(N) * 2
V[:, 1:] /= np.sqrt(N / 2) * 2
V = 2 * tf.reshape(V, x_shape)
return V
# 定义tf.keras版本的dct_channel_block
class DctChannelBlock(tf.keras.layers.Layer):
def __init__(self, channel):
super(DctChannelBlock, self).__init__()
self.fc = tf.keras.Sequential([
tf.keras.layers.Dense(channel * 2, use_bias=False),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.ReLU(),
tf.keras.layers.Dense(channel, use_bias=False),
tf.keras.layers.Activation('sigmoid')
])
self.dct_norm = tf.keras.layers.LayerNormalization(axis=-1, epsilon=1e-6)
# def get_config(self):
# # 自定义层里面的属性
# config = (
# {
# 'units': self.units,
# 'return_sequences': self.return_sequences
# }
# )
# base_config = super(DctChannelBlock, self).get_config()
# return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs, **kwargs):
x = inputs
b, c, l = x.shape
dct_list = []
for i in range(c):
freq = dct(x[:, i, :])
dct_list.append(freq)
stack_dct = tf.stack(dct_list, axis=1)
lr_weight = self.dct_norm(stack_dct)
lr_weight = self.fc(stack_dct)
lr_weight = self.dct_norm(lr_weight)
return x * lr_weight

View File

@ -0,0 +1,120 @@
# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/6/14 13:49
@Usage :
@Desc : 标准版LSTM
'''
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense, Conv2D, Conv1D
from model.ChannelAttention.DCT_channelAttention import DCTChannelAttention as DctChannelBlock
from tensorflow.keras import *
import tensorflow.keras.layers as layers
class AttentionEmbedLSTMLayer(layers.Layer):
# 定义两个权重初始化方法,方便后续调用
k_ini = initializers.GlorotUniform()
b_ini = initializers.Zeros()
def __init__(self, units=30, return_sequences: bool = False, **kwargs):
super(AttentionEmbedLSTMLayer, self).__init__()
self.units = units
self.return_sequences = return_sequences
def get_params(self, num_inputs, num_outputs):
def _one(shape, name):
# return tf.Variable(tf.random.normal(shape=shape, stddev=0.01, mean=0, dtype=tf.float32))
return self.add_weight(shape=shape, name=name, initializer=tf.random_normal_initializer)
def _three(name1, name2):
return (_one(shape=(num_inputs + num_outputs, num_outputs), name=name1),
self.add_weight(shape=(num_outputs,), name=name2,
initializer=tf.zeros_initializer))
W_i, b_i = _three("W_i", "b_i") # 输入门参数
W_f, b_f = _three("W_f", "b_f") # 遗忘门参数
W_o, b_o = _three("W_o", "b_o") # 输出门参数
W_c, b_c = _three("W_c", "b_c") # 候选记忆细胞参数
# 输出层参数
return W_i, b_i, W_f, b_f, W_o, b_o, W_c, b_c
def get_config(self):
# 自定义层里面的属性
config = (
{
'units': self.units,
'return_sequences': self.return_sequences
}
)
base_config = super(AttentionEmbedLSTMLayer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def build(self, input_shape):
num_inputs, num_outputs = input_shape[-1], self.units
self.W_i, self.b_i, self.W_f, self.b_f, self.W_o, self.b_o, self.W_c, self.b_c = self.get_params(
num_inputs=num_inputs, num_outputs=num_outputs)
self.dctAttention = DctChannelBlock(num_inputs + num_outputs)
pass
def call(self, inputs, **kwargs):
epoch, hiddens, dims = inputs.shape
# print(filter_num, dims)
for hidden in range(hiddens):
new_input = inputs[:, hidden, :]
new_input = tf.expand_dims(new_input, axis=1)
if hidden != 0:
new_input = tf.concat([new_input, ht_1], axis=-1)
else:
new_input = tf.pad(new_input, [[0, 0], [0, 0], [0, self.units]])
new_input = self.dctAttention(new_input)
Wi = tf.matmul(new_input, self.W_i) + self.b_i
Wf = tf.matmul(new_input, self.W_f) + self.b_f
Wc = tf.matmul(new_input, self.W_c) + self.b_c
Wo = tf.matmul(new_input, self.W_o) + self.b_o
ft = tf.nn.sigmoid(Wf)
it = tf.nn.sigmoid(Wi)
ct_ = tf.nn.tanh(Wc)
ot = tf.nn.sigmoid(Wo)
if hidden != 0:
ct = tf.add(tf.multiply(ft, ct_1), tf.multiply(it, ct_))
else:
ct = tf.multiply(it, ct_)
ht = tf.multiply(tf.nn.tanh(ct), ot)
if self.return_sequences:
if hidden == 0:
output = ht
else:
output = tf.concat([output, ht], axis=1)
else:
if hidden == hiddens - 1:
output = tf.squeeze(ht, axis=1)
ht_1 = ht
ct_1 = ct
# output = tf.reshape(output, [-1, filter_num, units])
# print(output.shape)
return output
if __name__ == '__main__':
pass
# tf.keras.layers.LSTM(return_sequences=)

View File

@ -0,0 +1,41 @@
# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/9 16:47
@Usage :
@Desc :
'''
from model.LossFunction.transfer.mmd import MMDLoss
import tensorflow as tf
import tensorflow.losses
class TransferLoss(tf.keras.losses.Loss):
def __init__(self, loss_type='cosine'):
"""
Supported loss_type: mmd(mmd_lin), mmd_rbf, coral, cosine, kl, js, mine, adv
"""
self.loss_type = loss_type
def call(self, X, Y):
"""Compute adaptation loss
Arguments:
X {tensor} -- source matrix
Y {tensor} -- target matrix
Returns:
[tensor] -- transfer loss
"""
if self.loss_type == 'mmd_lin' or self.loss_type == 'mmd':
mmdloss = MMDLoss(kernel_type='linear')
loss = mmdloss(X, Y)
elif self.loss_type == 'cosine' or self.loss_type == 'cos':
loss = 1 - tf.losses.cosine_similarity(X, Y)
elif self.loss_type == 'mmd_rbf':
mmdloss = MMDLoss(kernel_type='rbf')
loss = mmdloss(X, Y)
return loss

View File

@ -0,0 +1,8 @@
#-*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/9 16:54
@Usage :
@Desc :
'''

View File

@ -0,0 +1,75 @@
import tensorflow as tf
import numpy as np
class MMDLoss(tf.keras.losses.Loss):
def __init__(self, kernel_type='linear', kernel_mul=2.0, kernel_num=5):
super(MMDLoss, self).__init__()
self.kernel_type = kernel_type
self.kernel_mul = kernel_mul
self.kernel_num = kernel_num
def get_config(self):
# 自定义层里面的属性
config = (
{
'kernel_type': self.kernel_type,
'kernel_mul': self.kernel_mul,
'kernel_num': self.kernel_num
}
)
base_config = super(MMDLoss, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def guassian_kernel(self, source, target, kernel_mul=2.0, kernel_num=5, fix_sigma=None):
n_samples = int(source.shape[0]) + int(target.shape[0])
total = tf.concat([source, target], axis=0)
total0 = tf.expand_dims(total, 0)
total0 = tf.tile(total0, [total.shape[0], 1, 1])
total1 = tf.expand_dims(total, 1)
total1 = tf.tile(total1, [1, total.shape[0], 1])
L2_distance = tf.reduce_sum((total0 - total1) ** 2, axis=2)
if fix_sigma:
bandwidth = fix_sigma
else:
bandwidth = tf.reduce_sum(L2_distance) / (n_samples ** 2 - n_samples)
bandwidth /= kernel_mul ** (kernel_num // 2)
bandwidth_list = [bandwidth * (kernel_mul ** i)
for i in range(kernel_num)]
kernel_val = [tf.exp(-L2_distance / bandwidth_temp)
for bandwidth_temp in bandwidth_list]
return sum(kernel_val)
def linear_mmd(self, X, Y):
delta = tf.reduce_mean(X, axis=0) - tf.reduce_mean(Y, axis=0)
loss = tf.linalg.matmul(delta, delta, transpose_b=True)
return loss
def call(self, source, target):
if self.kernel_type == 'linear':
return self.linear_mmd(source, target)
elif self.kernel_type == 'rbf':
batch_size = int(source.shape[0])
kernels = self.guassian_kernel(
source, target, kernel_mul=self.kernel_mul, kernel_num=self.kernel_num, fix_sigma=None)
with tf.GradientTape(persistent=True) as tape:
tape.watch(kernels)
XX = tf.reduce_mean(kernels[:batch_size, :batch_size])
YY = tf.reduce_mean(kernels[batch_size:, batch_size:])
XY = tf.reduce_mean(kernels[:batch_size, batch_size:])
YX = tf.reduce_mean(kernels[batch_size:, :batch_size])
loss = XX + YY - XY - YX
return loss
if __name__ == '__main__':
# 示例用法
source = np.random.randn(100, 128)
target = np.random.randn(100, 128)
source_tf = tf.convert_to_tensor(source, dtype=tf.float32)
target_tf = tf.convert_to_tensor(target, dtype=tf.float32)
mmd_loss = MMDLoss(kernel_type='rbf', kernel_mul=2.0, kernel_num=5)
loss = mmd_loss(source_tf, target_tf)
print("MMD Loss:", loss.numpy())