diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java index e4e6ff3..444e25e 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/GasTypeStateApp.java @@ -9,6 +9,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** @@ -54,17 +55,20 @@ public class GasTypeStateApp extends BaseStreamApp { " AirInletDP_1 DOUBLE, " + " T1_1 DOUBLE, " + " LubeHaderP DOUBLE, " + - " LubeFilterDP DOUBLE, " + + " T5TC9 DOUBLE, " + " TankT DOUBLE, " + - " GrBxAccel DOUBLE, " + + " LFT DOUBLE, " + " realtime STRING, " + - " row_time as TO_TIMESTAMP(realtime), " + - " WATERMARK FOR row_time as row_time - INTERVAL '3' SECOND " + + " row_time as TO_TIMESTAMP(SUBSTRING(realtime from 0 for 19), 'yyyy-MM-dd HH:mm:ss'), " + + " WATERMARK FOR row_time as row_time - INTERVAL '5' SECOND" + ") " + "with(" + MyKafkaUtils.getKafkaSourceByDDL(topic, groupId) + ")"; tableEnv.executeSql(sql); +// tableEnv.executeSql("select type_id,rated_temp,T5TC1,realtime, row_time from gas_scada_type").print(); +// tableEnv.executeSql("select * from gas_scada_type").print(); + //TODO 3.分组开窗聚合 String selectSQL = "select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt, " + @@ -77,18 +81,18 @@ public class GasTypeStateApp extends BaseStreamApp { " rated_power, " + " rated_load, " + " rated_duration, " + - " AVG(LubeReturnT2) as avg_LubeReturnT2, " + - " AVG(T5TC1) as avg_T5TC1, " + - " AVG(GFFlow) as avg_GFFlow, " + - " AVG(LFFlow) as avg_LFFlow, " + - " AVG(NHP_1) as avg_NHP_1, " + - " AVG(AirInletDP_1) as avg_AirInletDP_1, " + - " AVG(T1_1) as avg_T1_1, " + - " AVG(LubeHaderP) as avg_LubeHaderP, " + - " AVG(LubeFilterDP) as avg_LubeFilterDP, " + - " AVG(TankT) as avg_TankT, " + - " AVG(GrBxAccel) as avg_GrBxAccel, " + - " UNIX_TIMESTAMP() * 1000 ts " + + " AVG(LubeReturnT2) avg_LubeReturnT2, " + + " AVG(T5TC1) avg_T5TC1, " + + " AVG(GFFlow) avg_GFFlow, " + + " AVG(LFFlow) avg_LFFlow, " + + " AVG(NHP_1) avg_NHP_1, " + + " AVG(AirInletDP_1) avg_AirInletDP_1, " + + " AVG(T1_1) avg_T1_1, " + + " AVG(LubeHaderP) avg_LubeHaderP, " + + " AVG(T5TC9) avg_T5TC9, " + + " AVG(TankT) avg_TankT, " + + " AVG(LFT) avg_LFT, " + + " UNIX_TIMESTAMP() * 1000 ts " + "from gas_scada_type " + "group by TUMBLE(row_time, INTERVAL '3' MINUTE), " + " type_id, " + @@ -100,7 +104,7 @@ public class GasTypeStateApp extends BaseStreamApp { " rated_load, " + " rated_duration"; Table table = tableEnv.sqlQuery(selectSQL); - +// tableEnv.executeSql(selectSQL).print(); DataStream windowDS = tableEnv.toAppendStream(table, GasTypeState.class); windowDS.print(">>>"); diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql index 4eb71b0..1bff73d 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dws/tableSQL.sql @@ -169,9 +169,9 @@ create table gas_scada_type AirInletDP_1 DOUBLE, T1_1 DOUBLE, LubeHaderP DOUBLE, - LubeFilterDP DOUBLE, + T5TC9 DOUBLE, TankT DOUBLE, - GrBxAccel DOUBLE, + LFT DOUBLE, realtime STRING, row_time as TO_TIMESTAMP(realtime), WATERMARK FOR row_time as row_time - INTERVAL '3' SECOND @@ -196,18 +196,18 @@ select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:m rated_power, rated_load, rated_duration, - AVG(LubeReturnT2) as avg_LubeReturnT2, - AVG(T5TC1) as avg_T5TC1, - AVG(GFFlow) as avg_GFFlow, - AVG(LFFlow) as avg_LFFlow, - AVG(NHP_1) as avg_NHP_1, - AVG(AirInletDP_1) as avg_AirInletDP_1, - AVG(T1_1) as avg_T1_1, - AVG(LubeHaderP) as avg_LubeHaderP, - AVG(LubeFilterDP) as avg_LubeFilterDP, - AVG(TankT) as avg_TankT, - AVG(GrBxAccel) as avg_GrBxAccel, - UNIX_TIMESTAMP() * 1000 ts + AVG(LubeReturnT2) avg_LubeReturnT2, + AVG(T5TC1) avg_T5TC1, + AVG(GFFlow) avg_GFFlow, + AVG(LFFlow) avg_LFFlow, + AVG(NHP_1) avg_NHP_1, + AVG(AirInletDP_1) avg_AirInletDP_1, + AVG(T1_1) avg_T1_1, + AVG(LubeHaderP) avg_LubeHaderP, + AVG(T5TC9) avg_T5TC9, + AVG(TankT) avg_TankT, + AVG(LFT) avg_LFT, + UNIX_TIMESTAMP() * 1000 ts from gas_scada_type group by TUMBLE(row_time, INTERVAL '3' MINUTE), type_id, diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java index ad0b246..0685758 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTypeState.java @@ -35,8 +35,8 @@ public class GasTypeState { private Double avg_AirInletDP_1; private Double avg_T1_1; private Double avg_LubeHaderP; - private Double avg_LubeFilterDP; + private Double avg_T5TC9; private Double avg_TankT; - private Double avg_GrBxAccel; + private Double avg_LFT; private Long ts; } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/ClickhouseUtils.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/ClickhouseUtils.java index b44d147..de6bb02 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/ClickhouseUtils.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/ClickhouseUtils.java @@ -51,7 +51,7 @@ public class ClickhouseUtils { new JdbcExecutionOptions.Builder() // .withMaxRetries(3) // .withBatchIntervalMs() - .withBatchSize(5)//每5次输出到clickhouse + .withBatchSize(1)//每5次输出到clickhouse .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")