diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/dfs/LC_1254_ClosedIsland.java b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/dfs/LC_1254_ClosedIsland.java new file mode 100644 index 0000000..fed8db5 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/dfs/LC_1254_ClosedIsland.java @@ -0,0 +1,112 @@ +package com.markilue.leecode.hot100.interviewHot.dfs; + +import org.junit.Test; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.interviewHot.dfs + *@Author: markilue + *@CreateTime: 2023-05-19 11:33 + *@Description: TODO 力扣1254 统计封闭岛屿的数量 + *@Version: 1.0 + */ +public class LC_1254_ClosedIsland { + + @Test + public void test() { + int[][] grid = { + {1, 1, 1, 1, 1, 1, 1, 0}, + {1, 0, 0, 0, 0, 1, 1, 0}, + {1, 0, 1, 0, 1, 1, 1, 0}, + {1, 0, 0, 0, 0, 1, 0, 1}, + {1, 1, 1, 1, 1, 1, 1, 0} + }; + System.out.println(closedIsland(grid)); + } + + @Test + public void test1() { + int[][] grid = { + {0, 0, 1, 1, 0, 1, 0, 0, 1, 0}, + {1, 1, 0, 1, 1, 0, 1, 1, 1, 0}, + {1, 0, 1, 1, 1, 0, 0, 1, 1, 0}, + {0, 1, 1, 0, 0, 0, 0, 1, 0, 1}, + {0, 0, 0, 0, 0, 0, 1, 1, 1, 0}, + {0, 1, 0, 1, 0, 1, 0, 1, 1, 1}, + {1, 0, 1, 0, 1, 1, 0, 0, 0, 1}, + {1, 1, 1, 1, 1, 1, 0, 0, 0, 0}, + {1, 1, 1, 0, 0, 1, 0, 1, 0, 1}, + {1, 1, 1, 0, 1, 1, 0, 1, 1, 0} + }; + System.out.println(closedIsland(grid)); + } + + + public int closedIsland(int[][] grid) { + + int result = 0; + boolean[][] used = new boolean[grid.length][grid[0].length]; + for (int i = 0; i < grid.length; i++) { + for (int j = 0; j < grid[0].length; j++) { + if (grid[i][j] == 0 && dfs(grid, i, j, used)) { + result++; + } + } + } + return result; + } + + public boolean dfs(int[][] grid, int i, int j, boolean[][] used) { + //判断当前位置是否到达边界 + if (i < 0 || j < 0 || i >= grid.length || j >= grid[0].length) { + return false; + } + //是封闭的情况 + if (grid[i][j] == 1 || used[i][j]) { + return true; + } + if (grid[i][j] == 0) { + //0的情况,需要进一步进行判断 + grid[i][j] = 2; + used[i][j] = true; + boolean flag1 = dfs(grid, i, j + 1, used); + boolean flag2 = dfs(grid, i, j - 1, used); + boolean flag3 = dfs(grid, i + 1, j, used); + boolean flag4 = dfs(grid, i - 1, j, used); + if (flag1 && flag2 && flag3 && flag4) { + return true; + } + } + return false; + + } + + + public int closedIsland1(int[][] grid) { + + int result = 0; + for (int i = 0; i < grid.length; i++) { + for (int j = 0; j < grid[0].length; j++) { + if (grid[i][j] == 0 && dfs1(grid, i, j)) { + result++; + } + } + } + return result; + } + + public boolean dfs1(int[][] grid, int i, int j) { + //判断当前位置是否到达边界 + if (i < 0 || j < 0 || i >= grid.length || j >= grid[0].length) { + return false; + } + //是封闭的情况 + if (grid[i][j] == 1) { + return true; + } + //0的情况,需要进一步进行判断 + grid[i][j] = 1; + //&&是短路与,只要有一个不满足就不会判断后面的;&是逻辑与,会判断完后面的 + return dfs1(grid, i, j + 1) & dfs1(grid, i, j - 1) & dfs1(grid, i + 1, j) & dfs1(grid, i - 1, j); + } +} diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/dfs/LC_200_NumIslands.java b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/dfs/LC_200_NumIslands.java new file mode 100644 index 0000000..b1531c8 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/dfs/LC_200_NumIslands.java @@ -0,0 +1,39 @@ +package com.markilue.leecode.hot100.interviewHot.dfs; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.interviewHot.dfs + *@Author: markilue + *@CreateTime: 2023-05-19 11:51 + *@Description: TODO + *@Version: 1.0 + */ +public class LC_200_NumIslands { + + //只要连通就是一个岛屿,所以可以dfs遇上一个1就把连通的所有1消除 + public int numIslands(char[][] grid) { + int result =0; + for (int i = 0; i < grid.length; i++) { + for (int j = 0; j < grid[0].length; j++) { + if(grid[i][j]=='1'){ + result++; + dfs(grid,i,j); + } + } + } + return result; + } + + public void dfs(char[][] grid, int i, int j) { + if (i < 0 || j < 0 || i >= grid.length || j >= grid[0].length) { + return; + } + if (grid[i][j] == '1') { + grid[i][j] = '0'; + dfs(grid, i + 1, j); + dfs(grid, i - 1, j); + dfs(grid, i, j + 1); + dfs(grid, i, j - 1); + } + } +} diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/listnode/LC_25_ReverseKGroup.java b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/listnode/LC_25_ReverseKGroup.java index 978c69d..cab6f28 100644 --- a/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/listnode/LC_25_ReverseKGroup.java +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/listnode/LC_25_ReverseKGroup.java @@ -102,4 +102,48 @@ public class LC_25_ReverseKGroup { } + public ListNode reverseKGroup2(ListNode head, int k) { + ListNode temp = head; + + for (int i = 0; i < k; i++) { + if (temp == null) return head;//不够k个 + temp = temp.next; + } + + //够k个,开始翻转 + ListNode root = reverse(head, temp); + head.next = reverseKGroup2(temp, k);//temp是下一次的开始 + return root; + } + + public ListNode reverse(ListNode start, ListNode end) { + if (start == null) return start; + + ListNode fake = new ListNode(); + ListNode temp = start; + ListNode tempNext; + while (temp != end) { + tempNext = temp.next; + temp.next = fake.next; + fake.next = temp; + temp = tempNext; + } + return fake.next; + } + + public ListNode reverse1(ListNode start, ListNode end) { + if (start == null) return start; + + ListNode fake = new ListNode(); + + ListNode tempNext; + while (start != end) { + tempNext = start.next; + start.next = fake.next; + fake.next = start; + start = tempNext; + } + return fake.next; + } + } diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T52_139_WordBreak.java b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T52_139_WordBreak.java index 7a86d7c..85fedf8 100644 --- a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T52_139_WordBreak.java +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T52_139_WordBreak.java @@ -20,7 +20,7 @@ public class T52_139_WordBreak { public void test() { String s = "leetcode"; List wordDict = Arrays.asList("leet", "code"); - System.out.println(wordBreak(s, wordDict)); + System.out.println(wordBreak3(s, wordDict)); } @Test @@ -92,7 +92,7 @@ public class T52_139_WordBreak { boolean flag = false; for (String s1 : wordDict) { int length = s1.length(); - if(i>=length)flag |= dfs(s, wordDict, i - length)&&s1.equals(s.substring(i-length,i)); + if (i >= length) flag |= dfs(s, wordDict, i - length) && s1.equals(s.substring(i - length, i)); if (flag) { memo[i] = 1; return flag; @@ -104,4 +104,36 @@ public class T52_139_WordBreak { } + //回溯+记忆化搜搜 + public boolean wordBreak3(String s, List wordDict) { + memo = new int[s.length()]; + Arrays.fill(memo, -1); + return dfs(s, 0, wordDict); + } + + public boolean dfs(String s, int start, List wordDict) { + if (start == s.length()) { + return true; + } + if (start > s.length()) { + return false; + } + if (memo[start] != -1) { + return memo[start] == 1; + } + for (String word : wordDict) { + if (s.length() - start >= word.length() && s.substring(start, start + word.length()).equals(word)) { + if (dfs(s, start + word.length(), wordDict)) { + memo[start] = 1; + return true; + } + + } + } + memo[start] = 0; + return false; + + } + + } diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T59_160_GetIntersectionNode.java b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T59_160_GetIntersectionNode.java index c5995a1..2f6ef32 100644 --- a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T59_160_GetIntersectionNode.java +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T59_160_GetIntersectionNode.java @@ -26,4 +26,17 @@ public class T59_160_GetIntersectionNode { return curA; } + + + public ListNode getIntersectionNode1(ListNode headA, ListNode headB) { + + ListNode tempA = headA; + ListNode tempB = headB; + + while (tempA != tempB) {//经典之处在于:不管是有没有相交,最后都会因为两者相等出来,因为两者走了同样的距离,都是两条链表长度之和 + tempA = tempA == null ? headB : tempA.next; + tempB = tempB == null ? headA : tempB.next; + } + return tempA; + } } diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_MajorityElement.java b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_169_MajorityElement.java similarity index 94% rename from Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_MajorityElement.java rename to Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_169_MajorityElement.java index 17d2957..2a321df 100644 --- a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_MajorityElement.java +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_169_MajorityElement.java @@ -8,7 +8,7 @@ package com.markilue.leecode.hot100.second; *@Description: TODO 力扣169 多数元素 *@Version: 1.0 */ -public class T60_MajorityElement { +public class T60_169_MajorityElement { //莫斯投票法:当当前的数的count数就替换最大数 public int majorityElement(int[] nums) { diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T61_198_Rob.java b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T61_198_Rob.java new file mode 100644 index 0000000..39c0fd8 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T61_198_Rob.java @@ -0,0 +1,33 @@ +package com.markilue.leecode.hot100.second; + +import org.junit.Test; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.second + *@Author: markilue + *@CreateTime: 2023-05-19 11:34 + *@Description: TODO 力扣198 打家劫舍 + *@Version: 1.0 + */ +public class T61_198_Rob { + + @Test + public void test(){ + int[] nums = {2,7,9,3,1}; + System.out.println(rob(nums)); + } + + + //动态规划:当前位置偷不偷可以取决于前一个位置偷不偷 + public int rob(int[] nums) { + int[][] dp = new int[nums.length][2]; + dp[0][0] = 0; + dp[0][1] = nums[0]; + for (int i = 1; i < nums.length; i++) { + dp[i][0] = Math.max(dp[i - 1][0], dp[i - 1][1]);//昨天偷没偷 + dp[i][1] = dp[i - 1][0] + nums[i];//今天一定要偷 + } + return Math.max(dp[nums.length - 1][0], dp[nums.length - 1][1]); + } +} diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T62_200_NumIslands.java b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T62_200_NumIslands.java new file mode 100644 index 0000000..8da047c --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T62_200_NumIslands.java @@ -0,0 +1,39 @@ +package com.markilue.leecode.hot100.second; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.second + *@Author: markilue + *@CreateTime: 2023-05-19 11:43 + *@Description: TODO 力扣200 岛屿数量 + *@Version: 1.0 + */ +public class T62_200_NumIslands { + + //只要连通就是一个岛屿,所以可以dfs遇上一个1就把连通的所有1消除 + public int numIslands(char[][] grid) { + int result =0; + for (int i = 0; i < grid.length; i++) { + for (int j = 0; j < grid[0].length; j++) { + if(grid[i][j]=='1'){ + result++; + dfs(grid,i,j); + } + } + } + return result; + } + + public void dfs(char[][] grid, int i, int j) { + if (i < 0 || j < 0 || i >= grid.length || j >= grid[0].length) { + return; + } + if (grid[i][j] == '1') { + grid[i][j] = '0'; + dfs(grid, i + 1, j); + dfs(grid, i - 1, j); + dfs(grid, i, j + 1); + dfs(grid, i, j - 1); + } + } +} diff --git a/interview/MiHayo/src/main/java/com/markilue/interview/Question1.java b/interview/MiHayo/src/main/java/com/markilue/interview/Question1.java index f6e38c4..c4af8de 100644 --- a/interview/MiHayo/src/main/java/com/markilue/interview/Question1.java +++ b/interview/MiHayo/src/main/java/com/markilue/interview/Question1.java @@ -90,8 +90,6 @@ public class Question1 { } return result; - - } public void dfs(char[][] chars, int i, int j, char last) { diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java index bf48777..5b9fa55 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java @@ -2,12 +2,28 @@ package com.cqu.warehouse.realtime.app.dwm; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; +import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.cqu.warehouse.realtime.app.base.BaseStreamApp; +import com.cqu.warehouse.realtime.app.func.DimAsyncFunction; +import com.cqu.warehouse.realtime.entity.GasTableProcess; +import com.cqu.warehouse.realtime.app.func.GasWideProcessFunction; +import com.cqu.warehouse.realtime.app.func.MyDeserializerSchemaFunction; +import com.cqu.warehouse.realtime.app.func.TableProcessFunction; import com.cqu.warehouse.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; /** *@BelongsProject: phm_parent @@ -16,9 +32,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; *@CreateTime: 2023-05-17 20:34 *@Description: * TODO 燃气轮机宽表: - * 1.维度拆分 - * 2.数据脱敏 + * 1.维度拆分 tagName拆分 + * 2.数据脱敏 tagMapping 动态分流FlinkCDC * 3.维度关联 + * 1)形成大宽表 "tagName":"LFFlow","value":"7.044725853949785" ->tag1:value1,tag2:value2 + * 2)添加额外信息 异步IO+旁路缓存 (燃机每个机器(gt_no)的额定功率,额定运行时长等) *@Version: 1.0 */ public class GasWideApp extends BaseStreamApp { @@ -45,18 +63,90 @@ public class GasWideApp extends BaseStreamApp { JSONObject jsonObject = JSON.parseObject(s); String tagName = jsonObject.getString("tagName"); String[] tagArray = tagName.split("]."); - jsonObject.put("tagName",tagArray[1]); + jsonObject.put("tagName", tagArray[1]); String[] message = tagArray[0].split("\\."); - jsonObject.put("area",message[0].substring(1)); - jsonObject.put("company",message[1]); - jsonObject.put("typeId",message[2]); - jsonObject.put("gt_no",message[3]); + jsonObject.put("area", message[0].substring(1)); + jsonObject.put("company", message[1]); + jsonObject.put("typeId", message[2]); + jsonObject.put("gt_no", message[3]); return jsonObject; } } ); - jsonObjectDS.print(">>>"); + //TODO 3.使用FLinkCDC读取mysql中的配置表gas_table_process + DebeziumSourceFunction gasTableProcessSource = MySQLSource.builder() + .hostname("Ding202") + .port(3306) + .username("root") + .password("123456") + .databaseList("rt_phm") + .tableList("rt_phm.gas_table_process") + .startupOptions(StartupOptions.initial())//每次都从新开始 + .deserializer(new MyDeserializerSchemaFunction()) + .build(); + + DataStreamSource gasTableProcessDS = env.addSource(gasTableProcessSource); +// gasTableProcessDS.print(); + + //TODO 4.将配置流广播 + MapStateDescriptor gasTableProcessMapStateDescriptor = new MapStateDescriptor("gasTableState", String.class, GasTableProcess.class); + BroadcastStream gasBroadDS = gasTableProcessDS.broadcast(gasTableProcessMapStateDescriptor); + + //TODO 5.原流join配置流,并进行广播 + BroadcastConnectedStream connectDS = jsonObjectDS.connect(gasBroadDS); + + //TODO 6.数据脱敏 双流connect+广播配置 = 动态脱敏 + OutputTag desensitizationTag = new OutputTag("desensitizationTag") { + }; + SingleOutputStreamOperator realDS = connectDS.process( + new TableProcessFunction(desensitizationTag, gasTableProcessMapStateDescriptor) + ); +// realDS.getSideOutput(desensitizationTag).print("&&&&"); +// realDS.print(">>>>"); + + //TODO 7.注册水位线 + SingleOutputStreamOperator realWithWatermarkDS = realDS.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) + .withTimestampAssigner( + new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(JSONObject jsonObject, long l) { + return jsonObject.getLong("ts"); + } + } + ) + ); + + //TODO 8.按照维度分区 ts和燃机号 + KeyedStream keyedDS = realWithWatermarkDS.keyBy( + jsonObject -> jsonObject.getString("gt_no") + ); + + //TODO 9.相同维度进统一分区进行维度关联 形成大宽表 + SingleOutputStreamOperator gasWideDS = keyedDS + .window(TumblingEventTimeWindows.of(Time.minutes(1))) + .process( + new GasWideProcessFunction() + ); + + gasWideDS.print("***"); + + //TODO 10.旁路缓存+异步IO 添加额外信息 ->类型维度(额定功率,额定运行时长等),地区维度(经纬度等)等 + SingleOutputStreamOperator gasWidedDS = AsyncDataStream.unorderedWait( + gasWideDS, + new DimAsyncFunction(), + 60, + TimeUnit.SECONDS + ); + + SingleOutputStreamOperator gasWideStrDS = gasWidedDS.map(jsonObject -> jsonObject.toJSONString()); + + gasWideStrDS.print(">>>>"); + + gasWideStrDS.addSink( + MyKafkaUtils.getKafkaSink("dwm_gas_wide") + ); } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java new file mode 100644 index 0000000..d26649f --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java @@ -0,0 +1,27 @@ +package com.cqu.warehouse.realtime.app.func; + + +import com.alibaba.fastjson.JSONObject; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-18 20:15 + *@Description: TODO 维度关联方法 + *@Version: 1.0 + */ +public class DimAsyncFunction extends RichAsyncFunction { + + + //异步调用主方法 + @Override + public void asyncInvoke(T t, ResultFuture resultFuture) throws Exception { + + } + + + +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/GasWideProcessFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/GasWideProcessFunction.java new file mode 100644 index 0000000..8e92b49 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/GasWideProcessFunction.java @@ -0,0 +1,45 @@ +package com.cqu.warehouse.realtime.app.func; + +import com.alibaba.fastjson.JSONObject; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.Date; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.func + *@Author: markilue + *@CreateTime: 2023-05-18 17:26 + *@Description: TODO 燃机的维度关联 大宽表的function + *@Version: 1.0 + */ +public class GasWideProcessFunction extends ProcessWindowFunction { + + @Override + public void process(String key, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception { + JSONObject json = null; + for (JSONObject jsonObject : iterable) { + String tagName = jsonObject.getString("tagName"); + Double value = jsonObject.getDouble("value"); + if (json == null) { + jsonObject.remove("tagName"); + jsonObject.remove("value"); + jsonObject.put(tagName, value); + json = jsonObject; + } else { + json.put(tagName, value); + } + } + long start = context.window().getStart(); + long end = context.window().getEnd(); + System.out.println("key:"+key+" start:"+new Date(start)+" end:"+new Date(end)+" size: "+json.size()); + + collector.collect(json); + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/MyDeserializerSchemaFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/MyDeserializerSchemaFunction.java new file mode 100644 index 0000000..ca89582 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/MyDeserializerSchemaFunction.java @@ -0,0 +1,57 @@ +package com.cqu.warehouse.realtime.app.func; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; +import io.debezium.data.Envelope; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.func + *@Author: markilue + *@CreateTime: 2023-05-18 14:03 + *@Description: TODO 自定义反序列化器 + *@Version: 1.0 + */ +public class MyDeserializerSchemaFunction implements DebeziumDeserializationSchema { + @Override + public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { + Struct value = (Struct) sourceRecord.value(); + Struct source = value.getStruct("source"); + Struct afterData = value.getStruct("after"); + + JSONObject jsonObject = new JSONObject(); + + jsonObject.put("database", source.getString("db")); + jsonObject.put("table", source.getString("table")); + + //类型 + String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();//内部通过枚举类,将op转为对应的string + if (type.equals("create")) { + type = "insert"; + } + jsonObject.put("type", type); + + JSONObject dataObject = new JSONObject(); + if (afterData != null) { + for (Field field : afterData.schema().fields()) { + String fieldName = field.name(); + Object fieldValue = afterData.get(fieldName); + dataObject.put(fieldName, fieldValue); + } + } + jsonObject.put("data", dataObject); + collector.collect(jsonObject.toJSONString()); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } + + +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/TableProcessFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/TableProcessFunction.java new file mode 100644 index 0000000..ead1e98 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/TableProcessFunction.java @@ -0,0 +1,90 @@ +package com.cqu.warehouse.realtime.app.func; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.entity.GasTableProcess; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.text.SimpleDateFormat; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.func + *@Author: markilue + *@CreateTime: 2023-05-18 14:32 + *@Description: TODO 数据脱敏function实现类 + *@Version: 1.0 + */ +public class TableProcessFunction extends BroadcastProcessFunction { + + private MapStateDescriptor mapStateDescriptor; + + private OutputTag outTag; + + private SimpleDateFormat sdf; + + @Override + public void open(Configuration parameters) throws Exception { + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + } + + public TableProcessFunction(OutputTag outTag, MapStateDescriptor mapStateDescriptor) { + this.mapStateDescriptor = mapStateDescriptor; + this.outTag = outTag; + } + + + @Override + public void processElement(JSONObject jsonObject, BroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception { + ReadOnlyBroadcastState gasTableState = readOnlyContext.getBroadcastState(mapStateDescriptor); + + String area = jsonObject.getString("area"); + String tagName = jsonObject.getString("tagName"); + + + GasTableProcess truth1 = gasTableState.get(area); + if (truth1 != null) { + jsonObject.put("area", truth1.getTag_desc()); + } + GasTableProcess truth = gasTableState.get(tagName); + if (truth != null) { +// jsonObject.put("originaltagName", truth.getDecode_value()); + jsonObject.put("tagName", truth.getDecode_value()); +// jsonObject.put("tag_desc", truth.getTag_desc()); +// jsonObject.put("unit", truth.getUnit()); +// jsonObject.put("is_calculated", truth.getIs_calculated()); +// readOnlyContext.output(outTag, jsonObject.toJSONString()); + } + + //添加上时间戳,方便后面注册水位线 + String time = jsonObject.getString("realtime"); + long ts = sdf.parse(time).getTime(); + jsonObject.put("ts",ts); + + collector.collect(jsonObject); + + } + + @Override + public void processBroadcastElement(String s, BroadcastProcessFunction.Context context, Collector collector) throws Exception { + BroadcastState gasTableProcessState = context.getBroadcastState(mapStateDescriptor); + + JSONObject jsonObject = JSON.parseObject(s); + + JSONObject data = jsonObject.getJSONObject("data"); + GasTableProcess gasTableProcess = JSON.parseObject(jsonObject.getString("data"), GasTableProcess.class); + if (data != null) { + String encode_value = data.getString("encode_value"); + if (encode_value != null) { + gasTableProcessState.put(encode_value, gasTableProcess);// + } + } + + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/format.json b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/format.json new file mode 100644 index 0000000..5a7b390 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/format.json @@ -0,0 +1,235 @@ +{ + "TC-004-5A": 16.388888465085376, + "TC-004-3A": 16.174404180794955, + "TC-004-1A": 17.399999618530273, + "IPT.NLP": 6798.819, + "T5TC8": 14.574999809265137, + "IPT.NHPS": 1.33200000000033, + "NPT": 2999.291, + "TC-004-12A": 15.495934943823764, + "TC-004-10A": 14.625051892983416, + "TC-004-16A": 18.715925347122052, + "company": "PENGBO", + "TC-004-14A": 18.189884806858995, + "TC-002A": 16.580334984418005, + "TC-002B": 14.649947725546857, + "T5TC6": 15.720535523506502, + "area": "TJ", + "realtime": "2023-01-01 00:00:05+00:00", + "TC-004-4A": 15.381883676474294, + "NHP_2": 0.0, + "NHP_1": 0.0, + "TE-405": 17.30650138501854, + "TC-004-2A": 16.914492427371442, + "TE-406": 12.75, + "IPT.NLPS": 1.62100000000009, + "gt_no": "PL19-3CEPB_C", + "TC-004-13A": 17.3510875862247, + "NPT_1": 0.0, + "NPT_2": 0.0, + "TC-004-11A": 14.899999618530273, + "NPT_3": 0.0, + "NPT_4": 0.0, + "SE-003E": 0.0, + "typeId": "PL19-3CEPB", + "TC-004-15A": 18.601334706973283, + "NLPCOR": 6964.0, + "time": "20230518171144", + "FI-7003A": 0.0, + "ts": 1672502405000 +} + + +{ + "GenExYAxisVib": 1.3521160539358485, + "GenExXAxisVib": 0.030277005878220582, + "T5TC9": 13.762499809265137, + "T5TC8": 14.574999809265137, + "PT-541": 0.2893644869327545, + "PT-542": 0.2544423043727875, + "PT-301": 3.7197840213775635, + "PT-302": 0.01945525035262108, + "PT-303": 0.013618679717183113, + "LubeDP_1": 0.4795, + "PT-305": 0.002509268932044506, + "T5TC1": 17.399999618530273, + "NHPS": 1.33200000000033, + "T5TC3": 16.174404180794955, + "T5TC2": 16.914492427371442, + "LFT": 13.777970314025879, + "T5TC5": 16.388888465085376, + "T5TC4": 15.381883676474294, + "PT-141": 0.0, + "T5TC7": 15.287857060382764, + "T5TC6": 15.720535523506502, + "TE-543": 25.590229885187, + "LFDP3": 0.0, + "AirInletDP": 0.4699, + "GFVlvPos": 0.03891034422752758, + "LFDP2": 0.6417040824890137, + "LFDP1": 0.1681240051984787, + "TE-142": 3.9642447746203593, + "TE-143": 6.919320775708184, + "TE-144": 3.6937499046325684, + "TE-541": 31.399999618530273, + "TE-542": 29.30980319622904, + "P2_2": 3.8415757444454357E-4, + "P2_1": 0.0, + "PT-651": 9.765625E-4, + "PT-652": 0.0015206909738481045, + "TankLevel": 552.1400756835938, + "typeId": "PL19-3CEPB", + "P2COR": 1.743, + "XT-004": 0.0, + "XT-001": 0.0, + "T5AVGCOR": 635.9, + "TankT": 39.29999923706055, + "PT-201": 0.041828788816928864, + "NLP": 6798.819, + "PT-202": 0.022629080340266228, + "GFT": 17.674999237060547, + "PT-203": 0.02175096981227398, + "TI-1535-AI1": 61.50103712081909, + "PT-601": 103.04540252685547, + "GenDrXAxisVib": 0.7065324326356252, + "LubeHaderPS": 0.0, + "PTB1XAxisVib": 0.034702058881521225, + "T1": 1.5, + "T7": 16.325344161037357, + "TE-601": 20.5, + "NHP_2": 0.0, + "LFFlow": 7.538909912109375, + "TE-602": 19.203675782556335, + "NLPS": 1.62100000000009, + "NHP_1": 0.0, + "HT-401": 0.0, + "T5AVg": 606.11875, + "POWER": 1.4162535526896438, + "EQURunningTime": 0.0, + "GPB1XAxisVib": 0.021722519971262955, + "PI-1501": 1726.3428649902344, + "TT-241": 14.210359573364258, + "TI-1535": 0.0, + "NPT_1": 0.0, + "NPT_2": 0.0, + "P15": 5.350193969206885E-4, + "NPT_3": 0.0, + "NPT_4": 0.0, + "NPT_5": 0.0, + "T15": 16.580334984418005, + "LFVlvPos": 0.019789973254470777, + "ZI-541": 0.527961231706043, + "LubeHaderP1": 0.009722140384837985, + "ZI-141": 5.949376106262207, + "ts": 1672502405000, + "XT-042": 230.22590306599935, + "XT-041": 222.02859895564615, + "HT-655": 37.097961738784214, + "TT-655": 8.921201508585364, + "TT-656": 2.641325143769306, + "HT-656": 48.680594845861194, + "GFFlow": 0.0, + "PT-342": 0.7079765796661377, + "TS1": 47.8, + "PT-103": 1.5402074131998233E-4, + "TC-002B": 14.649947725546857, + "area": "天津", + "realtime": "2023-01-01 00:00:05+00:00", + "GPB2XAxisVib": 0.13213856075384076, + "TT-301": 19.75046900291927, + "gt_no": "PL19-3CEPB_C", + "LubeReturnT4": 13.774999618530273, + "POWERCOR": 17.57, + "LubeReturnT2": 16.399999618530273, + "LubeReturnT3": 14.474655076023192, + "VE-401": 0.25278837216707567, + "LubeReturnT1": 16.799999237060547, + "XT-602": 10.64977370897929, + "XT-601": 7.845129648844401, + "NLPCOR": 6964.0, + "GFNozzleP": 0.01750973053276539, + "VE-402": 0.3067261383946364, + "FI-7003A": 0.0, + "XT-444": 1.6750426267584164, + "FI-7003B": 4665.285481770833, + "XT-443": 45.00442378882211, + "NU-STA-A": 0.0, + "XT-442": 0.21687513142824172, + "TE-653": 2.5999999046325684, + "TE-411": 16.889347404032016, + "XT-142": 0.003361384260157744, + "TE-654": 2.8260775385114054, + "TE-412": 16.174999237060547, + "XT-141": 5.064921343767613, + "TE-413": 16.46385730447558, + "T5TC10": 14.625051892983416, + "LT-541": 374.06939697265625, + "PI-1533": 10488.949951171875, + "TI-1501-AI1": 60.09032201766968, + "TE-651": 5.929999951521555, + "TE-652": 7.1476561546325685, + "PT-243": 0.8881018757820129, + "PT-002": 9.366126732857083E-5, + "NPT": 2999.291, + "PT-004": 0.010308037512004375, + "TI-1501": 0.0, + "AirInletDP_1": 0.0, + "PT-006": 0.0, + "AirInletDP_2": 0.0, + "PT-007": 0.008062498643994331, + "NHP": 8899.316, + "T5TC12": 15.495934943823764, + "TE-408": 12.582088929688325, + "T5TC11": 14.899999618530273, + "company": "PENGBO", + "T5TC14": 18.189884806858995, + "T5TC13": 17.3510875862247, + "LubeHaderP": 0.5440661, + "T5TC16": 18.715925347122052, + "NU-STO-FA": 0.0, + "T5TC15": 18.601334706973283, + "GFP0": 3.881516933441162, + "GenDrYAxisVib": 1.7166104729287326, + "LFNozzleP": 0.01945525035262108, + "P0": 0.0, + "TE-401": 33.90503839768159, + "P1": 0.0, + "LubeDT1": 44.3, + "TE-402": 33.09032756898087, + "P2": 1.767317, + "LubeDT2": 45.8, + "TE-403": 16.0, + "LubeDT3": 30.3, + "TE-404": 17.174999237060547, + "LubeReturnP": 0.0029182881116867065, + "TE-405": 17.30650138501854, + "NHPCOR": 9115.0, + "P5": 2.841948880814016E-4, + "NPTS": 0.122000000000298, + "TE-406": 12.75, + "TE-407": 12.274999618530273, + "T1_1": 10.0, + "NLP_1": 0.0, + "NLP_2": 0.0, + "LubeDT4": 27.8, + "GFDP1": 0.07133592665195465, + "T1_2": 10.585208142641932, + "T1_3": 10.274999618530273, + "LFP1": 0.006744488142430782, + "LFP0": 0.002677770098671317, + "time": "20230518171903", + "NPTCOR": 3072.0, + "RunningTime": 0.0, + "XT-542": 17.40686043103536, + "XT-541": 4.760690505238987 +} + +{ + "typeId": "PL19-3CEPB", + "ts": 1672502405000, + "area": "天津", + "realtime": "2023-01-01 00:00:05+00:00", + "gt_no": "PL19-3CEPB_C", + "company": "PENGBO", + "time": "20230518171903", +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/test.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/test.java new file mode 100644 index 0000000..5b7ed47 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/test.java @@ -0,0 +1,21 @@ +package com.cqu.warehouse.realtime.app.func; + +import java.util.Date; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.func + *@Author: markilue + *@CreateTime: 2023-05-18 17:42 + *@Description: TODO + *@Version: 1.0 + */ +public class test { + + public static void main(String[] args) { + Long ts = 1672502465000L; + Long ts1 =1672502405000L; + System.out.println(new Date(ts)); + System.out.println(new Date(ts1)); + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTableProcess.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTableProcess.java new file mode 100644 index 0000000..bc4d2a4 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTableProcess.java @@ -0,0 +1,24 @@ +package com.cqu.warehouse.realtime.entity; + +import lombok.Data; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.entity + *@Author: markilue + *@CreateTime: 2023-05-18 13:46 + *@Description: TODO 燃气轮机配置表实体类 + *@Version: 1.0 + */ +@Data +public class GasTableProcess { + + String encode_value;//加密数据 + String decode_value;//解密数据 + String tag_desc;//描述信息 + String unit;//单位 + String standard_unit;//标准单位 + Integer scale_factor;//尺度 + Integer is_calculated;//是否为计算数据 + String extend_info;//额外信息 +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/PhoenixUtils.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/PhoenixUtils.java new file mode 100644 index 0000000..227b7cb --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/PhoenixUtils.java @@ -0,0 +1,17 @@ +package com.cqu.warehouse.realtime.utils; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-18 20:19 + *@Description: TODO phoenix工具类 实现sql查询 + *@Version: 1.0 + */ +public class PhoenixUtils { + + + + + +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/ThreadPoolUtils.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/ThreadPoolUtils.java new file mode 100644 index 0000000..f5d31b3 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/ThreadPoolUtils.java @@ -0,0 +1,44 @@ +package com.cqu.warehouse.realtime.utils; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-18 20:05 + *@Description: TODO 创建线程池的工具类 + *@Version: 1.0 + */ +public class ThreadPoolUtils { + + //单例懒汉模式创建线程池 + private static volatile ThreadPoolExecutor pool; + private static final int corePoolSize = 8; + private static final int maximumPoolSize = 16; + private static final long keepAliveTime = 5000L; + private static final TimeUnit unit = TimeUnit.SECONDS; + private static final BlockingQueue workQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE); + + public static ThreadPoolExecutor getThreadPool() { + if (pool == null) { + synchronized (ThreadPoolExecutor.class) { + if (pool == null) { + System.out.println("---开辟线程池---"); + pool = new ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, workQueue + ); + } + } + } + return pool; + } + + +}