From af7485f4afb70711b9166b3ce6b25f36a9efa4b1 Mon Sep 17 00:00:00 2001 From: kevinding1125 <745518019@qq.com> Date: Tue, 30 May 2023 13:26:56 +0800 Subject: [PATCH] =?UTF-8?q?leecode=EF=BC=8Crt=5Fphm=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/WindSCADAController.java | 4 -- .../dataInterface/entity/GasTypeState.java | 1 + .../realtime/app/dws/GasTypeStateApp.java | 10 ++++- .../realtime/app/dws/clickhouseTable.sql | 29 +++++++++++++ .../warehouse/realtime/app/dws/tableSQL.sql | 3 +- .../realtime/entity/GasTypeState.java | 43 ++++++++++--------- 6 files changed, 62 insertions(+), 28 deletions(-) diff --git a/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/controller/WindSCADAController.java b/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/controller/WindSCADAController.java index 78b57a1..aa37ba9 100644 --- a/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/controller/WindSCADAController.java +++ b/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/controller/WindSCADAController.java @@ -49,8 +49,4 @@ public class WindSCADAController { return Result.ok().data("data", locationStateList); } - - - - } diff --git a/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/entity/GasTypeState.java b/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/entity/GasTypeState.java index a9d5cf1..3ff6ad0 100644 --- a/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/entity/GasTypeState.java +++ b/phm_rotate/backend/phm_backend/service/service_data_interface/src/main/java/com/cqu/dataInterface/entity/GasTypeState.java @@ -38,4 +38,5 @@ public class GasTypeState { Double avg_LubeFilterDP; Double avg_TankT; Double avg_GrBxAccel; + Long ts; } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java index 7a59574..e4e6ff3 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java @@ -3,6 +3,7 @@ package com.cqu.warehouse.realtime.app.dws; import com.cqu.warehouse.realtime.app.base.BaseStreamApp; import com.cqu.warehouse.realtime.app.dwm.GasWideApp; import com.cqu.warehouse.realtime.entity.GasTypeState; +import com.cqu.warehouse.realtime.utils.ClickhouseUtils; import com.cqu.warehouse.realtime.utils.MyKafkaUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -86,7 +87,8 @@ public class GasTypeStateApp extends BaseStreamApp { " AVG(LubeHaderP) as avg_LubeHaderP, " + " AVG(LubeFilterDP) as avg_LubeFilterDP, " + " AVG(TankT) as avg_TankT, " + - " AVG(GrBxAccel) as avg_GrBxAccel " + + " AVG(GrBxAccel) as avg_GrBxAccel, " + + " UNIX_TIMESTAMP() * 1000 ts " + "from gas_scada_type " + "group by TUMBLE(row_time, INTERVAL '3' MINUTE), " + " type_id, " + @@ -96,10 +98,14 @@ public class GasTypeStateApp extends BaseStreamApp { " rated_speed, " + " rated_power, " + " rated_load, " + - " rated_duration "; + " rated_duration"; Table table = tableEnv.sqlQuery(selectSQL); DataStream windowDS = tableEnv.toAppendStream(table, GasTypeState.class); windowDS.print(">>>"); + + windowDS.addSink( + ClickhouseUtils.getJDBCSink("insert into gas_type_state values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + ); } } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/clickhouseTable.sql b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/clickhouseTable.sql index 9d72a51..51daf92 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/clickhouseTable.sql +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/clickhouseTable.sql @@ -64,4 +64,33 @@ create table wind_SCADA_type_state ts UInt64 ) engine = ReplacingMergeTree(ts) partition by toYYYYMMDD(stt) +order by (stt,edt,type_id); + +-- 创建燃机按类型聚合的clickhouse表 +create table gas_type_state +( + stt DateTime, + edt DateTime, + type_id String, + rated_temp UInt64, + rated_press UInt64, + rated_flow_rate UInt64, + rated_speed UInt64, + rated_power UInt64, + rated_load UInt64, + rated_duration UInt64, + avg_LubeReturnT2 Decimal64(3), + avg_T5TC1 Decimal64(3), + avg_GFFlow Decimal64(3), + avg_LFFlow Decimal64(3), + avg_NHP_1 Decimal64(3), + avg_AirInletDP_1 Decimal64(3), + avg_T1_1 Decimal64(3), + avg_LubeHaderP Decimal64(3), + avg_LubeFilterDP Decimal64(3), + avg_TankT Decimal64(3), + avg_GrBxAccel Decimal64(3), + ts UInt64 +) engine = ReplacingMergeTree(ts) +partition by toYYYYMMDD(stt) order by (stt,edt,type_id); \ No newline at end of file diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql index 52038df..4eb71b0 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql @@ -206,7 +206,8 @@ select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:m AVG(LubeHaderP) as avg_LubeHaderP, AVG(LubeFilterDP) as avg_LubeFilterDP, AVG(TankT) as avg_TankT, - AVG(GrBxAccel) as avg_GrBxAccel + AVG(GrBxAccel) as avg_GrBxAccel, + UNIX_TIMESTAMP() * 1000 ts from gas_scada_type group by TUMBLE(row_time, INTERVAL '3' MINUTE), type_id, diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java index bfae625..ad0b246 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java @@ -17,25 +17,26 @@ import lombok.NoArgsConstructor; @AllArgsConstructor public class GasTypeState { - String stt; - String edt; - String type_id; - Long rated_temp; - Long rated_press; - Long rated_flow_rate; - Long rated_speed; - Long rated_power; - Long rated_load; - Long rated_duration; - Double avg_LubeReturnT2; - Double avg_T5TC1; - Double avg_GFFlow; - Double avg_LFFlow; - Double avg_NHP_1; - Double avg_AirInletDP_1; - Double avg_T1_1; - Double avg_LubeHaderP; - Double avg_LubeFilterDP; - Double avg_TankT; - Double avg_GrBxAccel; + private String stt; + private String edt; + private String type_id; + private Long rated_temp; + private Long rated_press; + private Long rated_flow_rate; + private Long rated_speed; + private Long rated_power; + private Long rated_load; + private Long rated_duration; + private Double avg_LubeReturnT2; + private Double avg_T5TC1; + private Double avg_GFFlow; + private Double avg_LFFlow; + private Double avg_NHP_1; + private Double avg_AirInletDP_1; + private Double avg_T1_1; + private Double avg_LubeHaderP; + private Double avg_LubeFilterDP; + private Double avg_TankT; + private Double avg_GrBxAccel; + private Long ts; }