# -*- encoding:utf-8 -*- ''' @Author : dingjiawen @Date : 2023/5/29 16:00 @Usage : 风机状态监测 @Desc : 从Kafka中读取dwm_wind_wide数据,通过CNN-LSTM模型进行状态监测 暂时存在问题:如果来一条的话,有维度的数据比如LSTM,GRU等就没办法了,只能跑AE类型的模型 ''' import tensorflow as tf import numpy as np import pandas as pd from utils.KafkaUtils import * from keras.callbacks import EarlyStopping import json # 超参数定义 timestamp = 100 feature_num = 10 learning_rate = 0.01 EPOCH = 100 batch_size = 32 model_name = "AE" save_name = "./model/{0}_timestamp{0}_feature_num{2}_batchSize{4}_epoch{4}.h5".format(model_name, timestamp, feature_num, batch_size, EPOCH) def condition_monitoring_model(): input = tf.keras.Input(shape=[timestamp, feature_num]) conv1 = tf.keras.layers.Conv1D(filters=256, kernel_size=1)(input) GRU1 = tf.keras.layers.GRU(128, return_sequences=False)(conv1) d1 = tf.keras.layers.Dense(300)(GRU1) output = tf.keras.layers.Dense(10)(d1) model = tf.keras.Model(inputs=input, outputs=output) return model def condition_monitoring_model_AE(): input = tf.keras.Input(shape=[feature_num]) conv1 = tf.keras.layers.Dense(feature_num/2)(input) GRU1 = tf.keras.layers.Dense(3)(conv1) d1 = tf.keras.layers.Dense(feature_num/2)(GRU1) output = tf.keras.layers.Dense(feature_num)(d1) model = tf.keras.Model(inputs=input, outputs=output) return model # 读取数据 def read_data(): data = pd.read_csv("G:\data\SCADA数据/jb4q_8_delete_all_zero_simple.csv") data = data.iloc[:10000, 3:13] data = np.array(data) return data pass # 重叠采样 def get_training_data_overlapping(data, time_stamp=timestamp): rows, cols = data.shape train_data = np.empty(shape=[rows - time_stamp - 1, time_stamp, cols]) train_label = np.empty(shape=[rows - time_stamp - 1, cols]) for i in range(rows): if i + time_stamp >= rows: break if i + time_stamp < rows - 1: train_data[i] = data[i:i + time_stamp] train_label[i] = data[i + time_stamp] print("重叠采样以后:") print("data:", train_data) print("label:", train_label) return train_data, train_label # 归一化 def normalization(data): rows, cols = data.shape print("归一化之前:", data) print(data.shape) print("======================") # 归一化 max = np.max(data, axis=0) max = np.broadcast_to(max, [rows, cols]) min = np.min(data, axis=0) min = np.broadcast_to(min, [rows, cols]) data = (data - min) / (max - min) print("归一化之后:", data) print(data.shape) return data def kafkaSource(): pass if __name__ == '__main__': # ## TODO training # data = read_data() # data = normalization(data=data) # # train_data, train_label = get_training_data_overlapping(data) # model = condition_monitoring_model_AE() # checkpoint = tf.keras.callbacks.ModelCheckpoint( # filepath=save_name, # monitor='val_loss', # verbose=1, # save_best_only=True, # mode='min', # period=1) # lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001) # early_stop = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=30, mode='min', verbose=1) # model.compile(optimizer=tf.optimizers.Adam(learning_rate=learning_rate), loss=tf.losses.mse) # model.summary() # model.fit(data, data, batch_size=batch_size, epochs=EPOCH, validation_split=0.1, # callbacks=[checkpoint, lr_scheduler, early_stop]) ### TODO testing consumer = getKafkaConsumer("dwm_wind_wide", group_id="wind_condition_monitoring_group") model = tf.keras.models.load_model(save_name) for message in consumer: record = str(message.value, encoding="utf-8") recordDict = eval(record) print("转换后的json:",record) data = np.array([recordDict['num_gearbox_sumptemp'], recordDict['num_gearbox_inletoiltemp'], recordDict['num_gearbox_inletpress'], recordDict['num_gearbox_pumpoutletpress'], recordDict['num_gearbox_coolingwatertemp'], recordDict['num_envtemp'], recordDict['num_gearbox_highspeedshaft_front_temp'], recordDict['num_gearbox_highspeedshaft_rear_temp'], recordDict['num_gen_max_windingtemp'], recordDict['num_engineroom_temp']]) data = np.expand_dims(data,axis=0) # data = normalization(data) # data = np.expand_dims(data,axis=0) predicted_data = model.predict(data) print("实际值:",data) print("预测出来的值:",predicted_data) # model.predict(message)