leecode,rt_phm更新更新

This commit is contained in:
markilue 2023-05-22 22:14:49 +08:00
parent 93c37ea883
commit 0db178165f
9 changed files with 331 additions and 28 deletions

View File

@ -1,10 +1,15 @@
package com.cqu.warehouse.realtime.app.dwd; package com.cqu.warehouse.realtime.app.dwd;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp; import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.func.DimFunction; import com.cqu.warehouse.realtime.app.func.DimFunction;
import com.cqu.warehouse.realtime.app.func.DimSink; import com.cqu.warehouse.realtime.app.func.DimSink;
import com.cqu.warehouse.realtime.app.func.MyDeserializerSchemaFunction;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils; import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@ -18,8 +23,9 @@ import org.apache.flink.util.Collector;
*@CreateTime: 2023-05-20 15:34 *@CreateTime: 2023-05-20 15:34
*@Description: *@Description:
* TODO 读取数据库中的配置信息(维度信息): * TODO 读取数据库中的配置信息(维度信息):
* 1)读取经过maxwell采集的维度数据 * 1)读取经过maxwell采集的维度数据(还需要过滤掉非维度数据)
* 2)通过指定的表插入对应的hbase表中 * 2)通过指定的表插入对应的hbase表中 RichMapFunction
* 需要启动的东西:zk,kafka,maxwell,cluster,hbase
*@Version: 1.0 *@Version: 1.0
*/ */
public class BaseDBApp extends BaseStreamApp { public class BaseDBApp extends BaseStreamApp {
@ -41,18 +47,27 @@ public class BaseDBApp extends BaseStreamApp {
//TODO 2.转为JSONObject //TODO 2.转为JSONObject
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSONObject::parseObject); SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSONObject::parseObject);
//TODO 3.判断是否需要在Hbase建表,并添加sink_table字段 //TODO 3.只读取指定的数据库的表
SingleOutputStreamOperator<JSONObject> sinkDS = jsonObjectDS.map( SingleOutputStreamOperator<JSONObject> filterDS = jsonObjectDS.filter(jsonObject -> "rt_phm_table".equals(jsonObject.getString("database")));
//TODO 4.判断是否需要在Hbase建表,并添加sink_table字段
SingleOutputStreamOperator<JSONObject> sinkDS = filterDS.map(
new DimFunction() new DimFunction()
); );
sinkDS.print(">>>"); sinkDS.print(">>>");
//TODO 4.写到Hbase,由于可能需要写到不同的表中jdbcSink暂时只能写到一张表中,所以需要自己重写SinkFunction //TODO 5.写到Hbase,由于可能需要写到不同的表中jdbcSink暂时只能写到一张表中,所以需要自己重写SinkFunction
/*
删除hbase表中的数据:
delete from PHM_REALTIME.DIM_WIND_LOCATION_INFO;
delete from PHM_REALTIME.DIM_WIND_TYPE_DETAIL;
delete from PHM_REALTIME.DIM_WIND_TYPE_INFO;
//delete from PHM_REALTIME.DIM_GAS_TYPE_INFO;
*/
sinkDS.addSink( sinkDS.addSink(
new DimSink() new DimSink()
); );
} }
} }

View File

@ -32,11 +32,13 @@ import java.util.concurrent.TimeUnit;
*@CreateTime: 2023-05-17 20:34 *@CreateTime: 2023-05-17 20:34
*@Description: *@Description:
* TODO 燃气轮机宽表: * TODO 燃气轮机宽表:
* 1.维度拆分 tagName拆分 * 1.维度拆分 tagName拆分 mapFunction
* 2.数据脱敏 tagMapping 动态分流FlinkCDC * 2.数据脱敏 tagMapping 动态分流FlinkCDC+broadcastState
* 3.维度关联 * 3.维度关联
* 1)形成大宽表 "tagName":"LFFlow","value":"7.044725853949785" ->tag1:value1,tag2:value2 * 1)形成大宽表 状态编程-是否合适有待考虑 "tagName":"LFFlow","value":"7.044725853949785" ->tag1:value1,tag2:value2
* 2)添加额外信息 异步IO+旁路缓存 (燃机每个机器(gt_no)的额定功率额定运行时长等) * 2)添加额外信息 异步IO+旁路缓存 (燃机每个机器(gt_no)的额定功率额定运行时长等)
* 需要启动的东西:zk,cluster,kafka,hbase,redis
* BaseDataApp,(BaseDBApp,maxwell),redis
*@Version: 1.0 *@Version: 1.0
*/ */
public class GasWideApp extends BaseStreamApp { public class GasWideApp extends BaseStreamApp {
@ -81,7 +83,7 @@ public class GasWideApp extends BaseStreamApp {
.username("root") .username("root")
.password("123456") .password("123456")
.databaseList("rt_phm") .databaseList("rt_phm")
.tableList("rt_phm.gas_table_process") .tableList("rt_phm.gas_code_process")
.startupOptions(StartupOptions.initial())//每次都从新开始 .startupOptions(StartupOptions.initial())//每次都从新开始
.deserializer(new MyDeserializerSchemaFunction()) .deserializer(new MyDeserializerSchemaFunction())
.build(); .build();
@ -133,20 +135,41 @@ public class GasWideApp extends BaseStreamApp {
gasWideDS.print("***"); gasWideDS.print("***");
//TODO 10.旁路缓存+异步IO 添加额外信息 ->类型维度(额定功率额定运行时长等),地区维度(经纬度等) //TODO 10.旁路缓存+异步IO 添加额外信息 ->类型维度(额定功率额定运行时长等),地区维度(经纬度等)
// SingleOutputStreamOperator<JSONObject> gasWidedDS = AsyncDataStream.unorderedWait(
// gasWideDS, //10.1 添加类型维度
// new DimAsyncFunction<JSONObject>(), SingleOutputStreamOperator<JSONObject> gasWideWithTypeDS = AsyncDataStream.unorderedWait(
// 60, gasWideDS,
// TimeUnit.SECONDS new DimAsyncFunction<JSONObject>("DIM_GAS_TYPE_INFO") {
// ); @Override
// public String getKey(JSONObject inObj) {
// SingleOutputStreamOperator<String> gasWideStrDS = gasWidedDS.map(jsonObject -> jsonObject.toJSONString()); return inObj.getString("typeId");
// }
// gasWideStrDS.print(">>>>");
// @Override
// gasWideStrDS.addSink( public void join(JSONObject inObj, JSONObject joinObj) {
// MyKafkaUtils.getKafkaSink("dwm_gas_wide") inObj.put("rated_temp",joinObj.getString("RATED_TEMP"));
// ); inObj.put("rated_press",joinObj.getString("RATED_PRESS"));
inObj.put("rated_flow_rate",joinObj.getString("RATED_FLOW_RATE"));
inObj.put("rated_speed",joinObj.getString("RATED_SPEED"));
inObj.put("rated_power",joinObj.getString("RATED_POWER"));
inObj.put("rated_load",joinObj.getString("RATED_LOAD"));
inObj.put("rated_duration",joinObj.getString("RATED_DURATION"));
}
},
60,
TimeUnit.SECONDS
);
//10.2 添加地区纬度
//TODO 11.sink到kafka的dwm_gas_wide
SingleOutputStreamOperator<String> gasWideStrDS = gasWideWithTypeDS.map(jsonObject -> jsonObject.toJSONString());
gasWideStrDS.print(">>>>");
gasWideStrDS.addSink(
MyKafkaUtils.getKafkaSink("dwm_gas_wide")
);
} }

View File

@ -0,0 +1,39 @@
package com.cqu.warehouse.realtime.app.dwm;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dwm
*@Author: markilue
*@CreateTime: 2023-05-22 19:55
*@Description:
* TODO 风电cms数据DWM层:
* 1)同一时刻数据拼接 -状态编程? 每一个时刻10万个点,全都放在内存当中不太合理?
* 2)lqtt算法提取数据的边界形状
*@Version: 1.0
*/
public class WindCMSApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new WindCMSApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.从kafka中读取数据
String topic = "dwd_wind_cms";
String groupId = "dwm_wind_cms_app_group";
DataStreamSource<String> kafkaDS = env.addSource(
MyKafkaUtils.getKafkaSource(topic, groupId)
);
kafkaDS.print(">>>");
}
}

View File

@ -0,0 +1,118 @@
package com.cqu.warehouse.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.func.DimAsyncFunction;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dwm
*@Author: markilue
*@CreateTime: 2023-05-22 15:54
*@Description:
* TODO 处理风电scada数据宽表:
* 1)添加类型id
* 2)添加类型信息
* 3)添加位置信息
*
*@Version: 1.0
*/
public class WindScadaWideApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new WindScadaWideApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.从kafka中读取数据
String topic = "dwd_wind_scada";
String groupId = "wind_scada_wide_app";
DataStreamSource<String> kafkaDS = env.addSource(
MyKafkaUtils.getKafkaSource(topic, groupId)
);
//TODO 2.转为jsonObject
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSON::parseObject);
//TODO 3.维度关联 异步IO+旁路缓存(注意:从hbase取出来的字段都是大写的使用jsonObject取的时候需要尤为注意)
//3.1 添加类型信息
SingleOutputStreamOperator<JSONObject> jsonWithTypeIdDS = AsyncDataStream.unorderedWait(
jsonObjectDS,
new DimAsyncFunction<JSONObject>("DIM_WIND_TYPE_INFO") {
@Override
public String getKey(JSONObject inObj) {
return inObj.getString("windfarm") + "_" + inObj.getString("wt_no");
}
@Override
public void join(JSONObject inObj, JSONObject joinObj) throws Exception {
// System.out.println("in:"+inObj);
// System.out.println("join:"+joinObj);
inObj.put("type_id", joinObj.getString("TYPE_ID"));
}
},
60, TimeUnit.SECONDS
);
//3.2 添加类型维度信息
SingleOutputStreamOperator<JSONObject> jsonObjWithTypeInfoDS = AsyncDataStream.unorderedWait(
jsonWithTypeIdDS,
new DimAsyncFunction<JSONObject>("DIM_WIND_TYPE_DETAIL") {
@Override
public String getKey(JSONObject inObj) {
return inObj.getString("type_id");
}
@Override
public void join(JSONObject inObj, JSONObject joinObj) throws Exception {
inObj.put("rated_air_volume", joinObj.getString("RATED_AIR_VOLUME"));
inObj.put("rated_press", joinObj.getString("RATED_PRESS"));
inObj.put("rated_speed", joinObj.getString("RATED_SPEED"));
inObj.put("rated_power", joinObj.getString("RATED_POWER"));
inObj.put("rated_load", joinObj.getString("RATED_LOAD"));
inObj.put("rated_efficiency", joinObj.getString("RATED_EFFICIENCY"));
}
},
60, TimeUnit.SECONDS
);
//3.3 添加地区信息
SingleOutputStreamOperator<JSONObject> windSCADAWideJSONDS = AsyncDataStream.unorderedWait(
jsonObjWithTypeInfoDS,
new DimAsyncFunction<JSONObject>("DIM_WIND_LOCATION_INFO") {
@Override
public String getKey(JSONObject inObj) {
return inObj.getString("windfarm");
}
@Override
public void join(JSONObject inObj, JSONObject joinObj) throws Exception {
inObj.put("location", joinObj.getString("REAL_NAME"));
inObj.put("production", joinObj.getString("PRODUCTION"));
}
},
60, TimeUnit.SECONDS
);
SingleOutputStreamOperator<String> windSCADAWideDS = windSCADAWideJSONDS.map(jsonObject -> jsonObject.toJSONString());
windSCADAWideDS.print(">>>>");
windSCADAWideDS.addSink(
MyKafkaUtils.getKafkaSink("dwd_wind_wide")
);
}
}

View File

@ -2,8 +2,16 @@ package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.utils.DimUtils;
import com.cqu.warehouse.realtime.utils.ThreadPoolUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
*@BelongsProject: phm_parent *@BelongsProject: phm_parent
@ -13,14 +21,54 @@ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
*@Description: TODO 维度关联方法 *@Description: TODO 维度关联方法
*@Version: 1.0 *@Version: 1.0
*/ */
public class DimAsyncFunction <T> extends RichAsyncFunction<T,T> { public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {
ExecutorService threadPool;
String tableName;
public DimAsyncFunction(String tableName) {
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
threadPool = ThreadPoolUtils.getThreadPool();
}
//异步调用主方法 //异步调用主方法
@Override @Override
public void asyncInvoke(T t, ResultFuture<T> resultFuture) throws Exception { public void asyncInvoke(T obj, ResultFuture<T> resultFuture) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
try {
long start = System.currentTimeMillis();
//选取指定字段
String key = getKey(obj);
//根据对应的id查询出需要关联的对象
JSONObject joinObj = DimUtils.getDimInfoWithCache(tableName, key);
//让当前对象与查询出的对象进行异步关联
if (joinObj != null) {
join(obj, joinObj);
}
// System.out.println(joinObj);
long end = System.currentTimeMillis();
System.out.println("维度异步查询耗时:" + (end - start) + "毫秒");
resultFuture.complete(Collections.singleton(obj));
} catch (Exception e) {
e.printStackTrace();
System.out.println("维度查询发生异常");
}
}
}
);
} }
} }

View File

@ -0,0 +1,21 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-22 14:40
*@Description:
* TODO 维度关联接口:定义好维度关联需要的方法
* 1)获取关联字段
* 2)进行维度关联
*@Version: 1.0
*/
public interface DimJoinFunction<T> {
public String getKey(T inObj);
public void join(T inObj, JSONObject joinObj) throws Exception;
}

View File

@ -23,6 +23,7 @@ public class GasWideProcessFunction extends ProcessWindowFunction<JSONObject, JS
@Override @Override
public void process(String key, ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>.Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception { public void process(String key, ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>.Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception {
//窗口结束触发方法,iterable里面存放窗口里的所有数据存放在内存中所以不适合太多
JSONObject json = null; JSONObject json = null;
for (JSONObject jsonObject : iterable) { for (JSONObject jsonObject : iterable) {
String tagName = jsonObject.getString("tagName"); String tagName = jsonObject.getString("tagName");

View File

@ -231,5 +231,31 @@
"realtime": "2023-01-01 00:00:05+00:00", "realtime": "2023-01-01 00:00:05+00:00",
"gt_no": "PL19-3CEPB_C", "gt_no": "PL19-3CEPB_C",
"company": "PENGBO", "company": "PENGBO",
"time": "20230518171903", "time": "20230518171903"
}
//
{
"wt_no": 8,
"realtime": "2018-07-30 00:00:00",
"num_windspeed": 7.318091,
"num_gen_torque": 36.32,
"num_gearbox_sumptemp": 50.035297,
"num_gearbox_inletoiltemp": 47.73672,
"degree_winddirection": 172.95117,
"num_gearbox_coolingwatertemp": 42.06111,
"num_envtemp": 23.717888,
"num_gen_speed": 1516.0,
"num_gearbox_inletpress": 2.621296,
"num_gearbox_pumpoutletpress": 5.004181,
"num_engineroom_temp": 32.029892,
"num_rotorspeed": 11.877784,
"num_activepower": 678.0,
"num_engineroom_vibration_x": 0.007827997,
"num_engineroom_vibration_y": -0.0063325884,
"time": "20230522160208",
"windfarm": "jingbian_siqi",
"num_gearbox_highspeedshaft_front_temp": 60.51593,
"num_gen_max_windingtemp": 78.60004,
"num_gearbox_highspeedshaft_rear_temp": 62.51814
} }

View File

@ -0,0 +1,12 @@
package com.cqu.warehouse.realtime.entity;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.entity
*@Author: markilue
*@CreateTime: 2023-05-22 17:37
*@Description: TODO 风电配置表信息
*@Version: 1.0
*/
public class WindTableProcess {
}