140 lines
4.8 KiB
Python
140 lines
4.8 KiB
Python
# -*- encoding:utf-8 -*-
|
||
|
||
'''
|
||
@Author : dingjiawen
|
||
@Date : 2023/5/29 16:00
|
||
@Usage : 风机状态监测
|
||
@Desc : 从Kafka中读取dwm_wind_wide数据,通过CNN-LSTM模型进行状态监测
|
||
暂时存在问题:如果来一条的话,有维度的数据比如LSTM,GRU等就没办法了,只能跑AE类型的模型
|
||
'''
|
||
|
||
import tensorflow as tf
|
||
import numpy as np
|
||
import pandas as pd
|
||
from utils.KafkaUtils import *
|
||
from keras.callbacks import EarlyStopping
|
||
import json
|
||
|
||
# 超参数定义
|
||
timestamp = 100
|
||
feature_num = 10
|
||
learning_rate = 0.01
|
||
EPOCH = 100
|
||
batch_size = 32
|
||
model_name = "AE"
|
||
save_name = "./model/{0}_timestamp{0}_feature_num{2}_batchSize{4}_epoch{4}.h5".format(model_name, timestamp,
|
||
feature_num, batch_size, EPOCH)
|
||
|
||
|
||
def condition_monitoring_model():
|
||
input = tf.keras.Input(shape=[timestamp, feature_num])
|
||
conv1 = tf.keras.layers.Conv1D(filters=256, kernel_size=1)(input)
|
||
GRU1 = tf.keras.layers.GRU(128, return_sequences=False)(conv1)
|
||
d1 = tf.keras.layers.Dense(300)(GRU1)
|
||
output = tf.keras.layers.Dense(10)(d1)
|
||
model = tf.keras.Model(inputs=input, outputs=output)
|
||
|
||
return model
|
||
|
||
def condition_monitoring_model_AE():
|
||
input = tf.keras.Input(shape=[feature_num])
|
||
conv1 = tf.keras.layers.Dense(feature_num/2)(input)
|
||
GRU1 = tf.keras.layers.Dense(3)(conv1)
|
||
d1 = tf.keras.layers.Dense(feature_num/2)(GRU1)
|
||
output = tf.keras.layers.Dense(feature_num)(d1)
|
||
model = tf.keras.Model(inputs=input, outputs=output)
|
||
|
||
return model
|
||
# 读取数据
|
||
def read_data():
|
||
data = pd.read_csv("G:\data\SCADA数据/jb4q_8_delete_all_zero_simple.csv")
|
||
data = data.iloc[:10000, 3:13]
|
||
data = np.array(data)
|
||
return data
|
||
pass
|
||
|
||
|
||
# 重叠采样
|
||
def get_training_data_overlapping(data, time_stamp=timestamp):
|
||
rows, cols = data.shape
|
||
train_data = np.empty(shape=[rows - time_stamp - 1, time_stamp, cols])
|
||
train_label = np.empty(shape=[rows - time_stamp - 1, cols])
|
||
for i in range(rows):
|
||
if i + time_stamp >= rows:
|
||
break
|
||
if i + time_stamp < rows - 1:
|
||
train_data[i] = data[i:i + time_stamp]
|
||
train_label[i] = data[i + time_stamp]
|
||
print("重叠采样以后:")
|
||
print("data:", train_data)
|
||
print("label:", train_label)
|
||
|
||
return train_data, train_label
|
||
|
||
|
||
# 归一化
|
||
def normalization(data):
|
||
rows, cols = data.shape
|
||
print("归一化之前:", data)
|
||
print(data.shape)
|
||
print("======================")
|
||
|
||
# 归一化
|
||
max = np.max(data, axis=0)
|
||
max = np.broadcast_to(max, [rows, cols])
|
||
min = np.min(data, axis=0)
|
||
min = np.broadcast_to(min, [rows, cols])
|
||
|
||
data = (data - min) / (max - min)
|
||
print("归一化之后:", data)
|
||
print(data.shape)
|
||
|
||
return data
|
||
|
||
|
||
def kafkaSource():
|
||
pass
|
||
|
||
|
||
if __name__ == '__main__':
|
||
|
||
# ## TODO training
|
||
# data = read_data()
|
||
# data = normalization(data=data)
|
||
# # train_data, train_label = get_training_data_overlapping(data)
|
||
# model = condition_monitoring_model_AE()
|
||
# checkpoint = tf.keras.callbacks.ModelCheckpoint(
|
||
# filepath=save_name,
|
||
# monitor='val_loss',
|
||
# verbose=1,
|
||
# save_best_only=True,
|
||
# mode='min',
|
||
# period=1)
|
||
# lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001)
|
||
# early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=30, mode='min', verbose=1)
|
||
# model.compile(optimizer=tf.optimizers.Adam(learning_rate=learning_rate), loss=tf.losses.mse)
|
||
# model.summary()
|
||
# model.fit(data, data, batch_size=batch_size, epochs=EPOCH, validation_split=0.1,
|
||
# callbacks=[checkpoint, lr_scheduler, early_stop])
|
||
|
||
### TODO testing
|
||
|
||
consumer = getKafkaConsumer("dwm_wind_wide", group_id="wind_condition_monitoring_group")
|
||
model = tf.keras.models.load_model(save_name)
|
||
for message in consumer:
|
||
record = str(message.value, encoding="utf-8")
|
||
recordDict = eval(record)
|
||
print("转换后的json:",record)
|
||
data = np.array([recordDict['num_gearbox_sumptemp'], recordDict['num_gearbox_inletoiltemp'], recordDict['num_gearbox_inletpress'],
|
||
recordDict['num_gearbox_pumpoutletpress'],
|
||
recordDict['num_gearbox_coolingwatertemp'], recordDict['num_envtemp'],
|
||
recordDict['num_gearbox_highspeedshaft_front_temp'], recordDict['num_gearbox_highspeedshaft_rear_temp'],
|
||
recordDict['num_gen_max_windingtemp'], recordDict['num_engineroom_temp']])
|
||
data = np.expand_dims(data,axis=0)
|
||
# data = normalization(data)
|
||
# data = np.expand_dims(data,axis=0)
|
||
predicted_data = model.predict(data)
|
||
print("实际值:",data)
|
||
print("预测出来的值:",predicted_data)
|
||
# model.predict(message)
|