leecode,rt_phm更新更新
This commit is contained in:
parent
db735dbbc3
commit
597ed80123
|
|
@ -9,6 +9,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.table.api.EnvironmentSettings;
|
import org.apache.flink.table.api.EnvironmentSettings;
|
||||||
import org.apache.flink.table.api.Table;
|
import org.apache.flink.table.api.Table;
|
||||||
|
import org.apache.flink.table.api.TableResult;
|
||||||
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -54,17 +55,20 @@ public class GasTypeStateApp extends BaseStreamApp {
|
||||||
" AirInletDP_1 DOUBLE, " +
|
" AirInletDP_1 DOUBLE, " +
|
||||||
" T1_1 DOUBLE, " +
|
" T1_1 DOUBLE, " +
|
||||||
" LubeHaderP DOUBLE, " +
|
" LubeHaderP DOUBLE, " +
|
||||||
" LubeFilterDP DOUBLE, " +
|
" T5TC9 DOUBLE, " +
|
||||||
" TankT DOUBLE, " +
|
" TankT DOUBLE, " +
|
||||||
" GrBxAccel DOUBLE, " +
|
" LFT DOUBLE, " +
|
||||||
" realtime STRING, " +
|
" realtime STRING, " +
|
||||||
" row_time as TO_TIMESTAMP(realtime), " +
|
" row_time as TO_TIMESTAMP(SUBSTRING(realtime from 0 for 19), 'yyyy-MM-dd HH:mm:ss'), " +
|
||||||
" WATERMARK FOR row_time as row_time - INTERVAL '3' SECOND " +
|
" WATERMARK FOR row_time as row_time - INTERVAL '5' SECOND" +
|
||||||
") " +
|
") " +
|
||||||
"with(" + MyKafkaUtils.getKafkaSourceByDDL(topic, groupId) + ")";
|
"with(" + MyKafkaUtils.getKafkaSourceByDDL(topic, groupId) + ")";
|
||||||
|
|
||||||
tableEnv.executeSql(sql);
|
tableEnv.executeSql(sql);
|
||||||
|
|
||||||
|
// tableEnv.executeSql("select type_id,rated_temp,T5TC1,realtime, row_time from gas_scada_type").print();
|
||||||
|
// tableEnv.executeSql("select * from gas_scada_type").print();
|
||||||
|
|
||||||
//TODO 3.分组开窗聚合
|
//TODO 3.分组开窗聚合
|
||||||
|
|
||||||
String selectSQL = "select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt, " +
|
String selectSQL = "select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt, " +
|
||||||
|
|
@ -77,18 +81,18 @@ public class GasTypeStateApp extends BaseStreamApp {
|
||||||
" rated_power, " +
|
" rated_power, " +
|
||||||
" rated_load, " +
|
" rated_load, " +
|
||||||
" rated_duration, " +
|
" rated_duration, " +
|
||||||
" AVG(LubeReturnT2) as avg_LubeReturnT2, " +
|
" AVG(LubeReturnT2) avg_LubeReturnT2, " +
|
||||||
" AVG(T5TC1) as avg_T5TC1, " +
|
" AVG(T5TC1) avg_T5TC1, " +
|
||||||
" AVG(GFFlow) as avg_GFFlow, " +
|
" AVG(GFFlow) avg_GFFlow, " +
|
||||||
" AVG(LFFlow) as avg_LFFlow, " +
|
" AVG(LFFlow) avg_LFFlow, " +
|
||||||
" AVG(NHP_1) as avg_NHP_1, " +
|
" AVG(NHP_1) avg_NHP_1, " +
|
||||||
" AVG(AirInletDP_1) as avg_AirInletDP_1, " +
|
" AVG(AirInletDP_1) avg_AirInletDP_1, " +
|
||||||
" AVG(T1_1) as avg_T1_1, " +
|
" AVG(T1_1) avg_T1_1, " +
|
||||||
" AVG(LubeHaderP) as avg_LubeHaderP, " +
|
" AVG(LubeHaderP) avg_LubeHaderP, " +
|
||||||
" AVG(LubeFilterDP) as avg_LubeFilterDP, " +
|
" AVG(T5TC9) avg_T5TC9, " +
|
||||||
" AVG(TankT) as avg_TankT, " +
|
" AVG(TankT) avg_TankT, " +
|
||||||
" AVG(GrBxAccel) as avg_GrBxAccel, " +
|
" AVG(LFT) avg_LFT, " +
|
||||||
" UNIX_TIMESTAMP() * 1000 ts " +
|
" UNIX_TIMESTAMP() * 1000 ts " +
|
||||||
"from gas_scada_type " +
|
"from gas_scada_type " +
|
||||||
"group by TUMBLE(row_time, INTERVAL '3' MINUTE), " +
|
"group by TUMBLE(row_time, INTERVAL '3' MINUTE), " +
|
||||||
" type_id, " +
|
" type_id, " +
|
||||||
|
|
@ -100,7 +104,7 @@ public class GasTypeStateApp extends BaseStreamApp {
|
||||||
" rated_load, " +
|
" rated_load, " +
|
||||||
" rated_duration";
|
" rated_duration";
|
||||||
Table table = tableEnv.sqlQuery(selectSQL);
|
Table table = tableEnv.sqlQuery(selectSQL);
|
||||||
|
// tableEnv.executeSql(selectSQL).print();
|
||||||
DataStream<GasTypeState> windowDS = tableEnv.toAppendStream(table, GasTypeState.class);
|
DataStream<GasTypeState> windowDS = tableEnv.toAppendStream(table, GasTypeState.class);
|
||||||
windowDS.print(">>>");
|
windowDS.print(">>>");
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -169,9 +169,9 @@ create table gas_scada_type
|
||||||
AirInletDP_1 DOUBLE,
|
AirInletDP_1 DOUBLE,
|
||||||
T1_1 DOUBLE,
|
T1_1 DOUBLE,
|
||||||
LubeHaderP DOUBLE,
|
LubeHaderP DOUBLE,
|
||||||
LubeFilterDP DOUBLE,
|
T5TC9 DOUBLE,
|
||||||
TankT DOUBLE,
|
TankT DOUBLE,
|
||||||
GrBxAccel DOUBLE,
|
LFT DOUBLE,
|
||||||
realtime STRING,
|
realtime STRING,
|
||||||
row_time as TO_TIMESTAMP(realtime),
|
row_time as TO_TIMESTAMP(realtime),
|
||||||
WATERMARK FOR row_time as row_time - INTERVAL '3' SECOND
|
WATERMARK FOR row_time as row_time - INTERVAL '3' SECOND
|
||||||
|
|
@ -196,18 +196,18 @@ select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:m
|
||||||
rated_power,
|
rated_power,
|
||||||
rated_load,
|
rated_load,
|
||||||
rated_duration,
|
rated_duration,
|
||||||
AVG(LubeReturnT2) as avg_LubeReturnT2,
|
AVG(LubeReturnT2) avg_LubeReturnT2,
|
||||||
AVG(T5TC1) as avg_T5TC1,
|
AVG(T5TC1) avg_T5TC1,
|
||||||
AVG(GFFlow) as avg_GFFlow,
|
AVG(GFFlow) avg_GFFlow,
|
||||||
AVG(LFFlow) as avg_LFFlow,
|
AVG(LFFlow) avg_LFFlow,
|
||||||
AVG(NHP_1) as avg_NHP_1,
|
AVG(NHP_1) avg_NHP_1,
|
||||||
AVG(AirInletDP_1) as avg_AirInletDP_1,
|
AVG(AirInletDP_1) avg_AirInletDP_1,
|
||||||
AVG(T1_1) as avg_T1_1,
|
AVG(T1_1) avg_T1_1,
|
||||||
AVG(LubeHaderP) as avg_LubeHaderP,
|
AVG(LubeHaderP) avg_LubeHaderP,
|
||||||
AVG(LubeFilterDP) as avg_LubeFilterDP,
|
AVG(T5TC9) avg_T5TC9,
|
||||||
AVG(TankT) as avg_TankT,
|
AVG(TankT) avg_TankT,
|
||||||
AVG(GrBxAccel) as avg_GrBxAccel,
|
AVG(LFT) avg_LFT,
|
||||||
UNIX_TIMESTAMP() * 1000 ts
|
UNIX_TIMESTAMP() * 1000 ts
|
||||||
from gas_scada_type
|
from gas_scada_type
|
||||||
group by TUMBLE(row_time, INTERVAL '3' MINUTE),
|
group by TUMBLE(row_time, INTERVAL '3' MINUTE),
|
||||||
type_id,
|
type_id,
|
||||||
|
|
|
||||||
|
|
@ -35,8 +35,8 @@ public class GasTypeState {
|
||||||
private Double avg_AirInletDP_1;
|
private Double avg_AirInletDP_1;
|
||||||
private Double avg_T1_1;
|
private Double avg_T1_1;
|
||||||
private Double avg_LubeHaderP;
|
private Double avg_LubeHaderP;
|
||||||
private Double avg_LubeFilterDP;
|
private Double avg_T5TC9;
|
||||||
private Double avg_TankT;
|
private Double avg_TankT;
|
||||||
private Double avg_GrBxAccel;
|
private Double avg_LFT;
|
||||||
private Long ts;
|
private Long ts;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ public class ClickhouseUtils {
|
||||||
new JdbcExecutionOptions.Builder()
|
new JdbcExecutionOptions.Builder()
|
||||||
// .withMaxRetries(3)
|
// .withMaxRetries(3)
|
||||||
// .withBatchIntervalMs()
|
// .withBatchIntervalMs()
|
||||||
.withBatchSize(5)//每5次输出到clickhouse
|
.withBatchSize(1)//每5次输出到clickhouse
|
||||||
.build(),
|
.build(),
|
||||||
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
|
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
|
||||||
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
|
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue