152 lines
4.2 KiB
Python
152 lines
4.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# coding: utf-8
|
|
|
|
'''
|
|
@Author : dingjiawen
|
|
@Date : 2023/5/16 19:46
|
|
@Usage : 将风电数据发送到kafka
|
|
@Desc : 风电齿轮箱数据读取和示例
|
|
'''
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
import numpy as np
|
|
import sys
|
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
|
import pytz
|
|
from utils.KafkaUtils import *
|
|
import os
|
|
|
|
scada_data_path = r"G:\data\SCADA数据\jb4q_8_delete_total_zero.csv"
|
|
scada_data_df = pd.read_csv(scada_data_path, encoding='gbk')
|
|
|
|
cms_send_data = []
|
|
cms_total_send_data = []
|
|
cms_total_data = []
|
|
|
|
index = 0
|
|
indexCMS = 0
|
|
cmsInterval = 10000
|
|
i = 0
|
|
now_time = datetime.now()
|
|
|
|
root = r"E:\data\cmsDemo/"
|
|
|
|
|
|
def loadData():
|
|
file_name = os.listdir(root)
|
|
dataList = []
|
|
for file in file_name:
|
|
if file.startswith("华能三塘湖"):
|
|
read_name = os.path.join(root + file)
|
|
# print(read_name)
|
|
data = pd.read_csv(read_name, encoding='utf-8', header=None)
|
|
total_data = data.values[0]
|
|
name = file.split("_")
|
|
data = {
|
|
"windfarm": name[0],
|
|
"wt_no": 5,
|
|
"realtime": file.split(".csv")[0].split("_")[-1],
|
|
"location": name[2] + "_" + name[3],
|
|
"g": 3,
|
|
"RPM": 14.41,
|
|
"freq": name[4],
|
|
"x": [],
|
|
"time": now_time.strftime("%Y%m%d%H%M%S")
|
|
}
|
|
dataList.append((data, total_data))
|
|
print("load cms data is comleted")
|
|
global cms_send_data
|
|
cms_send_data = dataList
|
|
|
|
pass
|
|
|
|
|
|
def sent_SCADA_data():
|
|
try:
|
|
global index
|
|
data = scada_data_df.iloc[index].to_json()
|
|
data = json.loads(data)
|
|
data["time"] = now_time.strftime("%Y%m%d%H%M%S")
|
|
# print(type(data))
|
|
data_send = json.dumps(data).encode("gbk")
|
|
# 发送三条消息
|
|
producer = getKafkaProducer()
|
|
producer.send(
|
|
'ods_base_data',
|
|
value=data_send
|
|
) # 向分区1发送消息
|
|
print("send {}".format(
|
|
data_send))
|
|
# 监控是否发送成功
|
|
index = index + 1
|
|
except Exception as e:
|
|
try:
|
|
traceback.print_exc(file=sys.stdout)
|
|
except Exception as ex:
|
|
print(ex)
|
|
finally:
|
|
# index = index % 5000
|
|
print(data)
|
|
print("end send data")
|
|
|
|
|
|
def sent_CMS_data():
|
|
try:
|
|
global indexCMS
|
|
global i
|
|
limit = indexCMS + cmsInterval
|
|
if limit >= len(cms_total_data[i]):
|
|
# 是发完剩下的还是换一个文件发:
|
|
if limit == len(cms_total_data[i])+cmsInterval:
|
|
# 换一个文件发
|
|
indexCMS = 0
|
|
i += 1
|
|
limit = indexCMS + cmsInterval
|
|
else:
|
|
# 发完剩下的
|
|
limit = len(cms_total_data[i])
|
|
# print(i)
|
|
cms_sent_data = cms_total_send_data[i]
|
|
|
|
cms_sent_data["x"] = list(cms_total_data[i][indexCMS:limit])
|
|
data = cms_sent_data
|
|
data_send = json.dumps(data).encode("gbk")
|
|
# 发送三条消息
|
|
producer = getKafkaProducer()
|
|
producer.send(
|
|
'ods_base_data',
|
|
value=data_send
|
|
) # 向分区1发送消息
|
|
|
|
# 监控是否发送成功
|
|
indexCMS = limit
|
|
except Exception as e:
|
|
try:
|
|
traceback.print_exc(file=sys.stdout)
|
|
except Exception as ex:
|
|
print(ex)
|
|
finally:
|
|
# indexCMS = indexCMS % 5000
|
|
print("data", data)
|
|
print("end send data")
|
|
pass
|
|
|
|
|
|
def send_cms_data_before():
|
|
for send_data, total_data in cms_send_data:
|
|
global cms_total_send_data
|
|
global cms_total_data
|
|
cms_total_send_data.append(send_data)
|
|
cms_total_data.append(total_data)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
loadData()
|
|
send_cms_data_before()
|
|
timez = pytz.timezone('Asia/Shanghai')
|
|
scheduler = BlockingScheduler(timezone=timez)
|
|
scheduler.add_job(sent_CMS_data, 'interval', seconds=0.5, next_run_time=datetime.now(), max_instances=10) # 定时发送
|
|
# scheduler.add_job(sent_SCADA_data, 'interval', seconds=0.5, next_run_time=datetime.now(), max_instances=10) # 定时发送
|
|
scheduler.start()
|