diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDBApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDBApp.java index 9e66a9a..38e50e3 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDBApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDBApp.java @@ -1,10 +1,15 @@ package com.cqu.warehouse.realtime.app.dwd; import com.alibaba.fastjson.JSONObject; +import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; +import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.cqu.warehouse.realtime.app.base.BaseStreamApp; import com.cqu.warehouse.realtime.app.func.DimFunction; import com.cqu.warehouse.realtime.app.func.DimSink; +import com.cqu.warehouse.realtime.app.func.MyDeserializerSchemaFunction; import com.cqu.warehouse.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -18,8 +23,9 @@ import org.apache.flink.util.Collector; *@CreateTime: 2023-05-20 15:34 *@Description: * TODO 读取数据库中的配置信息(维度信息): - * 1)读取经过maxwell采集的维度数据 - * 2)通过指定的表插入对应的hbase表中 + * 1)读取经过maxwell采集的维度数据(还需要过滤掉非维度数据) + * 2)通过指定的表插入对应的hbase表中 RichMapFunction + * 需要启动的东西:zk,kafka,maxwell,cluster,hbase *@Version: 1.0 */ public class BaseDBApp extends BaseStreamApp { @@ -41,18 +47,27 @@ public class BaseDBApp extends BaseStreamApp { //TODO 2.转为JSONObject SingleOutputStreamOperator jsonObjectDS = kafkaDS.map(JSONObject::parseObject); - //TODO 3.判断是否需要在Hbase建表,并添加sink_table字段 - SingleOutputStreamOperator sinkDS = jsonObjectDS.map( + //TODO 3.只读取指定的数据库的表 + SingleOutputStreamOperator filterDS = jsonObjectDS.filter(jsonObject -> "rt_phm_table".equals(jsonObject.getString("database"))); + + //TODO 4.判断是否需要在Hbase建表,并添加sink_table字段 + SingleOutputStreamOperator sinkDS = filterDS.map( new DimFunction() ); sinkDS.print(">>>"); - //TODO 4.写到Hbase,由于可能需要写到不同的表中,jdbcSink暂时只能写到一张表中,所以需要自己重写SinkFunction + //TODO 5.写到Hbase,由于可能需要写到不同的表中,jdbcSink暂时只能写到一张表中,所以需要自己重写SinkFunction + /* + 删除hbase表中的数据: + delete from PHM_REALTIME.DIM_WIND_LOCATION_INFO; + delete from PHM_REALTIME.DIM_WIND_TYPE_DETAIL; + delete from PHM_REALTIME.DIM_WIND_TYPE_INFO; + //delete from PHM_REALTIME.DIM_GAS_TYPE_INFO; + */ sinkDS.addSink( new DimSink() ); - } } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java index afa90f5..1029d63 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java @@ -32,11 +32,13 @@ import java.util.concurrent.TimeUnit; *@CreateTime: 2023-05-17 20:34 *@Description: * TODO 燃气轮机宽表: - * 1.维度拆分 tagName拆分 - * 2.数据脱敏 tagMapping 动态分流FlinkCDC + * 1.维度拆分 tagName拆分 mapFunction + * 2.数据脱敏 tagMapping 动态分流FlinkCDC+broadcastState * 3.维度关联 - * 1)形成大宽表 "tagName":"LFFlow","value":"7.044725853949785" ->tag1:value1,tag2:value2 + * 1)形成大宽表 状态编程-是否合适有待考虑 "tagName":"LFFlow","value":"7.044725853949785" ->tag1:value1,tag2:value2 * 2)添加额外信息 异步IO+旁路缓存 (燃机每个机器(gt_no)的额定功率,额定运行时长等) + * 需要启动的东西:zk,cluster,kafka,hbase,redis + * BaseDataApp,(BaseDBApp,maxwell),redis *@Version: 1.0 */ public class GasWideApp extends BaseStreamApp { @@ -81,7 +83,7 @@ public class GasWideApp extends BaseStreamApp { .username("root") .password("123456") .databaseList("rt_phm") - .tableList("rt_phm.gas_table_process") + .tableList("rt_phm.gas_code_process") .startupOptions(StartupOptions.initial())//每次都从新开始 .deserializer(new MyDeserializerSchemaFunction()) .build(); @@ -133,20 +135,41 @@ public class GasWideApp extends BaseStreamApp { gasWideDS.print("***"); //TODO 10.旁路缓存+异步IO 添加额外信息 ->类型维度(额定功率,额定运行时长等),地区维度(经纬度等)等 -// SingleOutputStreamOperator gasWidedDS = AsyncDataStream.unorderedWait( -// gasWideDS, -// new DimAsyncFunction(), -// 60, -// TimeUnit.SECONDS -// ); -// -// SingleOutputStreamOperator gasWideStrDS = gasWidedDS.map(jsonObject -> jsonObject.toJSONString()); -// -// gasWideStrDS.print(">>>>"); -// -// gasWideStrDS.addSink( -// MyKafkaUtils.getKafkaSink("dwm_gas_wide") -// ); + + //10.1 添加类型维度 + SingleOutputStreamOperator gasWideWithTypeDS = AsyncDataStream.unorderedWait( + gasWideDS, + new DimAsyncFunction("DIM_GAS_TYPE_INFO") { + @Override + public String getKey(JSONObject inObj) { + return inObj.getString("typeId"); + } + + @Override + public void join(JSONObject inObj, JSONObject joinObj) { + inObj.put("rated_temp",joinObj.getString("RATED_TEMP")); + inObj.put("rated_press",joinObj.getString("RATED_PRESS")); + inObj.put("rated_flow_rate",joinObj.getString("RATED_FLOW_RATE")); + inObj.put("rated_speed",joinObj.getString("RATED_SPEED")); + inObj.put("rated_power",joinObj.getString("RATED_POWER")); + inObj.put("rated_load",joinObj.getString("RATED_LOAD")); + inObj.put("rated_duration",joinObj.getString("RATED_DURATION")); + } + }, + 60, + TimeUnit.SECONDS + ); + + //10.2 添加地区纬度 + + //TODO 11.sink到kafka的dwm_gas_wide + SingleOutputStreamOperator gasWideStrDS = gasWideWithTypeDS.map(jsonObject -> jsonObject.toJSONString()); + + gasWideStrDS.print(">>>>"); + + gasWideStrDS.addSink( + MyKafkaUtils.getKafkaSink("dwm_gas_wide") + ); } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/WindCMSApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/WindCMSApp.java new file mode 100644 index 0000000..b557a55 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/WindCMSApp.java @@ -0,0 +1,39 @@ +package com.cqu.warehouse.realtime.app.dwm; + +import com.cqu.warehouse.realtime.app.base.BaseStreamApp; +import com.cqu.warehouse.realtime.utils.MyKafkaUtils; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.dwm + *@Author: markilue + *@CreateTime: 2023-05-22 19:55 + *@Description: + * TODO 风电cms数据DWM层: + * 1)同一时刻数据拼接 -状态编程? 每一个时刻10万个点,全都放在内存当中不太合理? + * 2)lqtt算法提取数据的边界形状 + *@Version: 1.0 + */ +public class WindCMSApp extends BaseStreamApp { + + public static void main(String[] args) throws Exception { + new WindCMSApp().entry(); + } + + @Override + public void execute(StreamExecutionEnvironment env) { + + //TODO 1.从kafka中读取数据 + String topic = "dwd_wind_cms"; + String groupId = "dwm_wind_cms_app_group"; + DataStreamSource kafkaDS = env.addSource( + MyKafkaUtils.getKafkaSource(topic, groupId) + ); + + kafkaDS.print(">>>"); + + + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/WindScadaWideApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/WindScadaWideApp.java new file mode 100644 index 0000000..44df02f --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/WindScadaWideApp.java @@ -0,0 +1,118 @@ +package com.cqu.warehouse.realtime.app.dwm; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.app.base.BaseStreamApp; +import com.cqu.warehouse.realtime.app.func.DimAsyncFunction; +import com.cqu.warehouse.realtime.utils.MyKafkaUtils; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.concurrent.TimeUnit; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.dwm + *@Author: markilue + *@CreateTime: 2023-05-22 15:54 + *@Description: + * TODO 处理风电scada数据宽表: + * 1)添加类型id + * 2)添加类型信息 + * 3)添加位置信息 + * + *@Version: 1.0 + */ +public class WindScadaWideApp extends BaseStreamApp { + + public static void main(String[] args) throws Exception { + new WindScadaWideApp().entry(); + } + + @Override + public void execute(StreamExecutionEnvironment env) { + + //TODO 1.从kafka中读取数据 + String topic = "dwd_wind_scada"; + String groupId = "wind_scada_wide_app"; + DataStreamSource kafkaDS = env.addSource( + MyKafkaUtils.getKafkaSource(topic, groupId) + ); + + //TODO 2.转为jsonObject + SingleOutputStreamOperator jsonObjectDS = kafkaDS.map(JSON::parseObject); + + //TODO 3.维度关联 异步IO+旁路缓存(注意:从hbase取出来的字段都是大写的,使用jsonObject取的时候需要尤为注意) + + //3.1 添加类型信息 + SingleOutputStreamOperator jsonWithTypeIdDS = AsyncDataStream.unorderedWait( + jsonObjectDS, + new DimAsyncFunction("DIM_WIND_TYPE_INFO") { + + @Override + public String getKey(JSONObject inObj) { + return inObj.getString("windfarm") + "_" + inObj.getString("wt_no"); + } + + @Override + public void join(JSONObject inObj, JSONObject joinObj) throws Exception { +// System.out.println("in:"+inObj); +// System.out.println("join:"+joinObj); + inObj.put("type_id", joinObj.getString("TYPE_ID")); + } + }, + 60, TimeUnit.SECONDS + ); + + //3.2 添加类型维度信息 + SingleOutputStreamOperator jsonObjWithTypeInfoDS = AsyncDataStream.unorderedWait( + jsonWithTypeIdDS, + new DimAsyncFunction("DIM_WIND_TYPE_DETAIL") { + + @Override + public String getKey(JSONObject inObj) { + return inObj.getString("type_id"); + } + + @Override + public void join(JSONObject inObj, JSONObject joinObj) throws Exception { + inObj.put("rated_air_volume", joinObj.getString("RATED_AIR_VOLUME")); + inObj.put("rated_press", joinObj.getString("RATED_PRESS")); + inObj.put("rated_speed", joinObj.getString("RATED_SPEED")); + inObj.put("rated_power", joinObj.getString("RATED_POWER")); + inObj.put("rated_load", joinObj.getString("RATED_LOAD")); + inObj.put("rated_efficiency", joinObj.getString("RATED_EFFICIENCY")); + } + }, + 60, TimeUnit.SECONDS + ); + + //3.3 添加地区信息 + SingleOutputStreamOperator windSCADAWideJSONDS = AsyncDataStream.unorderedWait( + jsonObjWithTypeInfoDS, + new DimAsyncFunction("DIM_WIND_LOCATION_INFO") { + + @Override + public String getKey(JSONObject inObj) { + return inObj.getString("windfarm"); + } + + @Override + public void join(JSONObject inObj, JSONObject joinObj) throws Exception { + inObj.put("location", joinObj.getString("REAL_NAME")); + inObj.put("production", joinObj.getString("PRODUCTION")); + } + }, + 60, TimeUnit.SECONDS + ); + + SingleOutputStreamOperator windSCADAWideDS = windSCADAWideJSONDS.map(jsonObject -> jsonObject.toJSONString()); + windSCADAWideDS.print(">>>>"); + windSCADAWideDS.addSink( + MyKafkaUtils.getKafkaSink("dwd_wind_wide") + ); + + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java index 471eab8..bd8eb54 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java @@ -2,8 +2,16 @@ package com.cqu.warehouse.realtime.app.func; import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.utils.DimUtils; +import com.cqu.warehouse.realtime.utils.ThreadPoolUtils; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.hadoop.tracing.SpanReceiverInfo; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** *@BelongsProject: phm_parent @@ -13,14 +21,54 @@ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; *@Description: TODO 维度关联方法 *@Version: 1.0 */ -public class DimAsyncFunction extends RichAsyncFunction { +public abstract class DimAsyncFunction extends RichAsyncFunction implements DimJoinFunction { + + ExecutorService threadPool; + String tableName; + + public DimAsyncFunction(String tableName) { + this.tableName = tableName; + } + + @Override + public void open(Configuration parameters) throws Exception { + threadPool = ThreadPoolUtils.getThreadPool(); + } //异步调用主方法 @Override - public void asyncInvoke(T t, ResultFuture resultFuture) throws Exception { + public void asyncInvoke(T obj, ResultFuture resultFuture) { + + threadPool.submit( + new Runnable() { + @Override + public void run() { + try { + long start = System.currentTimeMillis(); + //选取指定字段 + String key = getKey(obj); + //根据对应的id查询出需要关联的对象 + JSONObject joinObj = DimUtils.getDimInfoWithCache(tableName, key); + //让当前对象与查询出的对象进行异步关联 + if (joinObj != null) { + join(obj, joinObj); + } +// System.out.println(joinObj); + + long end = System.currentTimeMillis(); + System.out.println("维度异步查询耗时:" + (end - start) + "毫秒"); + resultFuture.complete(Collections.singleton(obj)); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("维度查询发生异常"); + } + + + } + } + ); } - } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimJoinFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimJoinFunction.java new file mode 100644 index 0000000..06d3b0c --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimJoinFunction.java @@ -0,0 +1,21 @@ +package com.cqu.warehouse.realtime.app.func; + +import com.alibaba.fastjson.JSONObject; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-22 14:40 + *@Description: + * TODO 维度关联接口:定义好维度关联需要的方法 + * 1)获取关联字段 + * 2)进行维度关联 + *@Version: 1.0 + */ +public interface DimJoinFunction { + + public String getKey(T inObj); + + public void join(T inObj, JSONObject joinObj) throws Exception; +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/GasWideProcessFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/GasWideProcessFunction.java index 8e92b49..d6d650c 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/GasWideProcessFunction.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/GasWideProcessFunction.java @@ -23,6 +23,7 @@ public class GasWideProcessFunction extends ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception { + //窗口结束触发方法,iterable里面存放窗口里的所有数据,存放在内存中,所以不适合太多 JSONObject json = null; for (JSONObject jsonObject : iterable) { String tagName = jsonObject.getString("tagName"); diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/format.json b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/format.json index 5a7b390..6350111 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/format.json +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/format.json @@ -231,5 +231,31 @@ "realtime": "2023-01-01 00:00:05+00:00", "gt_no": "PL19-3CEPB_C", "company": "PENGBO", - "time": "20230518171903", + "time": "20230518171903" } + +//风电原始数据 +{ + "wt_no": 8, + "realtime": "2018-07-30 00:00:00", + "num_windspeed": 7.318091, + "num_gen_torque": 36.32, + "num_gearbox_sumptemp": 50.035297, + "num_gearbox_inletoiltemp": 47.73672, + "degree_winddirection": 172.95117, + "num_gearbox_coolingwatertemp": 42.06111, + "num_envtemp": 23.717888, + "num_gen_speed": 1516.0, + "num_gearbox_inletpress": 2.621296, + "num_gearbox_pumpoutletpress": 5.004181, + "num_engineroom_temp": 32.029892, + "num_rotorspeed": 11.877784, + "num_activepower": 678.0, + "num_engineroom_vibration_x": 0.007827997, + "num_engineroom_vibration_y": -0.0063325884, + "time": "20230522160208", + "windfarm": "jingbian_siqi", + "num_gearbox_highspeedshaft_front_temp": 60.51593, + "num_gen_max_windingtemp": 78.60004, + "num_gearbox_highspeedshaft_rear_temp": 62.51814 +} \ No newline at end of file diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/WindTableProcess.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/WindTableProcess.java new file mode 100644 index 0000000..8dce322 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/WindTableProcess.java @@ -0,0 +1,12 @@ +package com.cqu.warehouse.realtime.entity; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.entity + *@Author: markilue + *@CreateTime: 2023-05-22 17:37 + *@Description: TODO 风电配置表信息 + *@Version: 1.0 + */ +public class WindTableProcess { +}