leecode,rt_phm更新更新

This commit is contained in:
kevinding1125 2023-05-30 13:26:56 +08:00
parent 1df4dce36e
commit af7485f4af
6 changed files with 62 additions and 28 deletions

View File

@ -49,8 +49,4 @@ public class WindSCADAController {
return Result.ok().data("data", locationStateList);
}
}

View File

@ -38,4 +38,5 @@ public class GasTypeState {
Double avg_LubeFilterDP;
Double avg_TankT;
Double avg_GrBxAccel;
Long ts;
}

View File

@ -3,6 +3,7 @@ package com.cqu.warehouse.realtime.app.dws;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.dwm.GasWideApp;
import com.cqu.warehouse.realtime.entity.GasTypeState;
import com.cqu.warehouse.realtime.utils.ClickhouseUtils;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@ -86,7 +87,8 @@ public class GasTypeStateApp extends BaseStreamApp {
" AVG(LubeHaderP) as avg_LubeHaderP, " +
" AVG(LubeFilterDP) as avg_LubeFilterDP, " +
" AVG(TankT) as avg_TankT, " +
" AVG(GrBxAccel) as avg_GrBxAccel " +
" AVG(GrBxAccel) as avg_GrBxAccel, " +
" UNIX_TIMESTAMP() * 1000 ts " +
"from gas_scada_type " +
"group by TUMBLE(row_time, INTERVAL '3' MINUTE), " +
" type_id, " +
@ -96,10 +98,14 @@ public class GasTypeStateApp extends BaseStreamApp {
" rated_speed, " +
" rated_power, " +
" rated_load, " +
" rated_duration ";
" rated_duration";
Table table = tableEnv.sqlQuery(selectSQL);
DataStream<GasTypeState> windowDS = tableEnv.toAppendStream(table, GasTypeState.class);
windowDS.print(">>>");
windowDS.addSink(
ClickhouseUtils.getJDBCSink("insert into gas_type_state values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
);
}
}

View File

@ -65,3 +65,32 @@ create table wind_SCADA_type_state
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,type_id);
-- 创建燃机按类型聚合的clickhouse表
create table gas_type_state
(
stt DateTime,
edt DateTime,
type_id String,
rated_temp UInt64,
rated_press UInt64,
rated_flow_rate UInt64,
rated_speed UInt64,
rated_power UInt64,
rated_load UInt64,
rated_duration UInt64,
avg_LubeReturnT2 Decimal64(3),
avg_T5TC1 Decimal64(3),
avg_GFFlow Decimal64(3),
avg_LFFlow Decimal64(3),
avg_NHP_1 Decimal64(3),
avg_AirInletDP_1 Decimal64(3),
avg_T1_1 Decimal64(3),
avg_LubeHaderP Decimal64(3),
avg_LubeFilterDP Decimal64(3),
avg_TankT Decimal64(3),
avg_GrBxAccel Decimal64(3),
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,type_id);

View File

@ -206,7 +206,8 @@ select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:m
AVG(LubeHaderP) as avg_LubeHaderP,
AVG(LubeFilterDP) as avg_LubeFilterDP,
AVG(TankT) as avg_TankT,
AVG(GrBxAccel) as avg_GrBxAccel
AVG(GrBxAccel) as avg_GrBxAccel,
UNIX_TIMESTAMP() * 1000 ts
from gas_scada_type
group by TUMBLE(row_time, INTERVAL '3' MINUTE),
type_id,

View File

@ -17,25 +17,26 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class GasTypeState {
String stt;
String edt;
String type_id;
Long rated_temp;
Long rated_press;
Long rated_flow_rate;
Long rated_speed;
Long rated_power;
Long rated_load;
Long rated_duration;
Double avg_LubeReturnT2;
Double avg_T5TC1;
Double avg_GFFlow;
Double avg_LFFlow;
Double avg_NHP_1;
Double avg_AirInletDP_1;
Double avg_T1_1;
Double avg_LubeHaderP;
Double avg_LubeFilterDP;
Double avg_TankT;
Double avg_GrBxAccel;
private String stt;
private String edt;
private String type_id;
private Long rated_temp;
private Long rated_press;
private Long rated_flow_rate;
private Long rated_speed;
private Long rated_power;
private Long rated_load;
private Long rated_duration;
private Double avg_LubeReturnT2;
private Double avg_T5TC1;
private Double avg_GFFlow;
private Double avg_LFFlow;
private Double avg_NHP_1;
private Double avg_AirInletDP_1;
private Double avg_T1_1;
private Double avg_LubeHaderP;
private Double avg_LubeFilterDP;
private Double avg_TankT;
private Double avg_GrBxAccel;
private Long ts;
}