self_example/pytorch_example/RUL/otherIdea/adaDctEmdLSTM/test.py

95 lines
4.0 KiB
Python

# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/11/10 16:27
@Usage :
@Desc :
'''
import numpy as np
import torch
from RUL.otherIdea.LSTM.loadData import getDataset, getTotalData
from RUL.otherIdea.dctLSTM.model import PredictModel
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from RUL.baseModel.plot import plot_prediction, plot_forSelf
# 仅使用预测出来的最新的一个点预测以后
def predictOneByOne(model, train_data, predict_num=50):
# 取出训练数据的最后一条
each_predict_data = train_data[-1].unsqueeze(0)
predicted_list = np.empty(shape=(predict_num, 1)) # (5,filter_num,30)
# all_data = total_data # (1201,)
for each_predict in range(predict_num):
# predicted_data.shape : (1,1)
predicted_data = model.predict(each_predict_data).cpu().detach().numpy() # (batch_size,filer_num,1)
predicted_list[each_predict] = predicted_data
each_predict_data = each_predict_data.numpy()
# (1,1) => (10,1)
# 中间拼接过程: (1) => (10) => (40,10) => (30,40,10)
c = each_predict_data[-1, -1, 1:]
a = np.concatenate([each_predict_data[-1, -1, 1:], np.expand_dims(predicted_data, axis=0)], axis=0)
b = np.concatenate([each_predict_data[-1, 1:, :], np.expand_dims(a, axis=0)], axis=0)
c = np.expand_dims(b, axis=0)
each_predict_data = torch.tensor(c)
return np.squeeze(predicted_list)
def test(hidden_num, feature, predict_num, batch_size, model, is_single=True, is_norm=False, save_fig_name=""):
total_data, total_dataset = getTotalData(hidden_num, feature, is_single=is_single, is_norm=is_norm)
train_dataset, val_dataset = getDataset(hidden_num, feature, predict_num=predict_num, is_single=is_single,
is_norm=is_norm)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)
# 加载网络
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(model)
params_num = sum(param.numel() for param in model.parameters())
print('参数数量:{}'.format(params_num))
model.eval()
predicted_data_easy = total_data[:hidden_num + feature, ]
predicted_data_hard = total_data[:hidden_num + feature, ]
with torch.no_grad():
for batch_idx, (data, label) in enumerate(train_loader):
data, label = data.to(device), label.to(device)
last_train_data = data
each_predicted_data = model.predict(data).cpu().detach().numpy()
predicted_data_easy = np.concatenate(
[predicted_data_easy, each_predicted_data],
axis=0)
predicted_data_hard = np.concatenate(
[predicted_data_hard, each_predicted_data],
axis=0)
# 简单版的,每次预测重新用已知的
for batch_idx, (data, label) in enumerate(val_loader):
data, label = data.to(device), label.to(device)
each_predicted_data = model.predict(data).cpu().detach().numpy()
predicted_data_easy = np.concatenate(
[predicted_data_easy, each_predicted_data],
axis=0)
# 困难版的,每次预测基于上次的预测
predicted_data_hard = np.concatenate([predicted_data_hard,
predictOneByOne(model, last_train_data, predict_num=predict_num)], axis=0)
plot_prediction(total_data, predicted_data_easy, predicted_data_hard, save_fig_name, predict_num=predict_num)
plot_forSelf(total_data, predicted_data_easy, predicted_data_hard)
if __name__ == '__main__':
test(40, 10, 50, 32,
"E:\self_example\pytorch_example\RUL\otherIdea/adaRNN\outputs\AdaRNN_tdcLoss(cos)_transferLoss(cos)_dw0.5_lr0.0005\parameters\AdaRNN_hidden24_feature10_predict50_dimList64-64_epoch62_trainLoss0.5115623474121094_valLoss0.12946119904518127.pkl"
)