leecode,rt_phm更新更新

This commit is contained in:
kevinding1125 2023-05-28 13:56:02 +08:00
parent 67bcb4b0bc
commit c9f5e14e63
47 changed files with 1850 additions and 27210 deletions

View File

@ -0,0 +1,184 @@
package com.markilue.leecode.hot100.interviewHot.graph;
import java.util.*;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.interviewHot.graph
*@Author: markilue
*@CreateTime: 2023-05-28 11:21
*@Description: TODO 力扣127 单词接龙
*@Version: 1.0
*/
public class LC_127_LadderLength {
int min = Integer.MAX_VALUE;
int cur = 0;
int[] visited;
//尚且有问题
public int ladderLength(String beginWord, String endWord, List<String> wordList) {
List<Integer> beginList = new ArrayList<>();
boolean flag = false;
int diff;
for (int i = 0; i < wordList.size(); i++) {
String s = wordList.get(i);
if (s.equals(endWord)) flag = true;
diff = 0;
for (int j = 0; j < beginWord.length(); j++) {
if (beginWord.charAt(j) != s.charAt(j)) diff++;
}
if (diff == 1) beginList.add(i);
}
visited = new int[wordList.size()];
if (!flag) return 0;
for (Integer next : beginList) {
cur++;
dfs(wordList, next, endWord);
cur--;
}
return min;
}
private void dfs(List<String> wordList, int now, String endWord) {
if (wordList.get(now).equals(endWord)) {
min = Math.min(min, cur);
}
visited[now] = 1;
int diff;
String curString = wordList.get(now);
for (int i = 0; i < wordList.size(); i++) {
if (visited[i] == 0) {
String s = wordList.get(i);
diff = 0;
for (int j = 0; j < curString.length(); j++) {
if (curString.charAt(j) != s.charAt(j)) diff++;
}
if (diff == 1) {
cur++;
dfs(wordList, i, endWord);
cur--;
}
}
}
visited[now] = 2;
}
//评论区题解之BFS:找最短路径一般使用bfs
public int ladderLength1(String beginWord, String endWord, List<String> wordList) {
//TODO 1.将wordList放到hashset中便于判断某个单词是否再wordList中
HashSet<String> wordSet = new HashSet<>(wordList);
if (wordSet.size() == 0 || !wordSet.contains(endWord)) {
return 0;//不存在endWord,一定找不到
}
wordSet.remove(beginWord);//有开头的直接去除
//TODO 2.图的广度优先遍历,必须使用队列和表示是否访问过的visited哈希表
LinkedList<String> queue = new LinkedList<>();
queue.offerFirst(beginWord);
HashSet<String> visited = new HashSet<>();
visited.add(beginWord);
//TODO 3.开始广度优先遍历,包含起点,因此初始化的时候步数为1
int step = 1;
while (!queue.isEmpty()) {
int currentSize = queue.size();
for (int i = 0; i < currentSize; i++) {
//依次遍历当前队列中的单词
String currentWord = queue.poll();
//如果修改一个字符与endWord相同则返回step+1
if (changeWordEveryOneLetter(currentWord, endWord, queue, visited, wordSet)) {
return step + 1;
}
}
step++;
}
return 0;
}
private boolean changeWordEveryOneLetter(String currentWord, String endWord, Queue<String> queue, Set<String> visited, Set<String> wordSet) {
char[] charArray = currentWord.toCharArray();
for (int i = 0; i < endWord.length(); i++) {
//先保存,然后恢复
char originChar = charArray[i];
for (char j = 'a'; j <= 'z'; j++) {
if (j == originChar) {
continue;
}
charArray[i] = j;
String nextWord = String.valueOf(charArray);
if (wordSet.contains(nextWord)) {
if (nextWord.equals(endWord)) {
return true;
}
if (!visited.contains(nextWord)) {
queue.add(nextWord);
//注意:添加到队列以后,必须马上标记已经访问
visited.add(nextWord);
}
}
}
charArray[i] = originChar;
}
return false;
}
//评论区题解至BFS:通用解法
public int ladderLength2(String beginWord, String endWord, List<String> wordList) {
int endIndex = wordList.indexOf(endWord);
if (endIndex == -1) return 0;
wordList.add(beginWord);
List<List<Integer>> adjacent = new ArrayList<>();
for (int i = 0; i < wordList.size(); ++i)
adjacent.add(new ArrayList<>());
for (int i = 0; i < wordList.size(); ++i) {
String s = wordList.get(i);
for (int j = i + 1; j < wordList.size(); ++j) {
if (judge(s, wordList.get(j))) {
adjacent.get(i).add(j);
adjacent.get(j).add(i);
}
}
}
return bfs(wordList.size() - 1, endIndex, adjacent, new boolean[wordList.size()]);
}
private int bfs(int i, int j, List<List<Integer>> adjacent, boolean[] visited) {
int distance = 0;
ArrayDeque<Integer> queue = new ArrayDeque<>();
queue.addLast(i);
while (!queue.isEmpty()) {
int size = queue.size();
distance++;
for (int k = 0; k < size; ++k) {
int v = queue.pollFirst();
visited[v] = true;
if (v == j) return distance;
List<Integer> edges = adjacent.get(v);
for (int e : edges) {
if (!visited[e]) {
queue.addLast(e);
}
}
}
}
return 0;
}
private boolean judge(String s, String p) {
int distance = 0;
int len = s.length();
for (int i = 0; i < len && distance < 2; ++i) {
if (s.charAt(i) != p.charAt(i)) distance++;
}
return distance < 2;
}
}

View File

@ -0,0 +1,65 @@
package com.markilue.leecode.hot100.interviewHot.graph;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.interviewHot.graph
*@Author: markilue
*@CreateTime: 2023-05-28 10:57
*@Description: TODO 力扣841 钥匙和房间
*@Version: 1.0
*/
public class LC_841_CanVisitAllRooms {
@Test
public void test() {
List<List<Integer>> rooms = new ArrayList<>();
rooms.add(Arrays.asList(1));
rooms.add(Arrays.asList(2));
rooms.add(Arrays.asList(3));
rooms.add(new ArrayList<>());
System.out.println(canVisitAllRooms(rooms));
}
@Test
public void test1() {
List<List<Integer>> rooms = new ArrayList<>();
rooms.add(Arrays.asList(1, 3));
rooms.add(Arrays.asList(3, 0, 1));
rooms.add(Arrays.asList(2));
rooms.add(new ArrayList<>(0));
System.out.println(canVisitAllRooms(rooms));
}
int[] visited;
public boolean canVisitAllRooms(List<List<Integer>> rooms) {
visited = new int[rooms.size()];
dfs(rooms, 0);
for (int i : visited) {
if (i != 2) {//必须全都遍历过
return false;
}
}
return true;
}
public void dfs(List<List<Integer>> rooms, int now) {
if (visited[now] != 0) return;//这个节点遍历完过就不用再遍历了
visited[now] = 1;
List<Integer> child = rooms.get(now);
for (Integer next : child) {
dfs(rooms, next);
}
visited[now] = 2;//当前的遍历完了
}
}

View File

@ -2,6 +2,7 @@ package com.markilue.leecode.hot100.interviewHot.singlestack;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
@ -19,8 +20,9 @@ public class LC_503_NextGreaterElements {
public void test() { public void test() {
// int[] nums = {1, 5, 3, 4, 3}; // int[] nums = {1, 5, 3, 4, 3};
// int[] nums = {1, 2, 3, 4, 3}; // int[] nums = {1, 2, 3, 4, 3};
int[] nums = {1, 2, 3, 2, 1}; // int[] nums = {1, 2, 3, 2, 1};
System.out.println(Arrays.toString(nextGreaterElements1(nums))); int[] nums = {1, 2, 1};
System.out.println(Arrays.toString(nextGreaterElements2(nums)));
} }
@ -95,4 +97,21 @@ public class LC_503_NextGreaterElements {
return result; return result;
} }
//朴素的思想:循环就在后面在拼上对称的数组
public int[] nextGreaterElements2(int[] nums) {
ArrayDeque<Integer> stack = new ArrayDeque<>();
int[] result = new int[nums.length];
for (int i = nums.length * 2 - 2; i >= 0; i--) {
while (!stack.isEmpty() && stack.peek() <= nums[i % nums.length]) {
stack.pop();
}
if (i < nums.length) result[i] = stack.isEmpty() ? -1 : stack.peek();
stack.push(nums[i % nums.length]);
}
return result;
}
} }

View File

@ -0,0 +1,46 @@
package com.markilue.leecode.hot100.second;
import java.util.ArrayDeque;
import java.util.Deque;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-28 10:15
*@Description: TODO 力扣155 最小栈2刷
*@Version: 1.0
*/
public class T58_155_MinStack_1 {
Deque<Integer> deque;
Deque<Integer> minDeque;
//使用两个stack来记录
public T58_155_MinStack_1() {
deque = new ArrayDeque<>();
minDeque = new ArrayDeque<>();
minDeque.add(Integer.MAX_VALUE);
}
public void push(int val) {
deque.push(val);
minDeque.push(val < minDeque.peek() ? val : minDeque.peek());
}
public void pop() {
deque.pop();
minDeque.pop();
}
public int top() {
if(deque.isEmpty()){
return -1;
}
return deque.peek();
}
public int getMin() {
return minDeque.peek();
}
}

View File

@ -16,17 +16,31 @@ import java.util.Arrays;
public class T72_239_MaxSlidingWindow { public class T72_239_MaxSlidingWindow {
@Test @Test
public void test(){ public void test() {
int[] nums = {1, 3, -1, -3, 5, 3, 6, 7}; int[] nums = {1, 3, -1, -3, 5, 3, 6, 7};
int k = 3; int k = 3;
System.out.println(Arrays.toString(maxSlidingWindow(nums,k))); System.out.println(Arrays.toString(maxSlidingWindow1(nums, k)));
} }
@Test @Test
public void test1(){ public void test1() {
int[] nums = {1,-1}; int[] nums = {1, -1};
int k = 1; int k = 1;
System.out.println(Arrays.toString(maxSlidingWindow(nums,k))); System.out.println(Arrays.toString(maxSlidingWindow(nums, k)));
}
@Test
public void test2() {
int[] nums = {7, 2, 4};
int k = 2;
System.out.println(Arrays.toString(maxSlidingWindow1(nums, k)));
}
@Test
public void test3() {
int[] nums = {1, 3, 1, 2, 0, 5};
int k = 3;
System.out.println(Arrays.toString(maxSlidingWindow1(nums, k)));
} }
//使用一个单调栈进行记录:定期排除过期元素 //使用一个单调栈进行记录:定期排除过期元素
@ -60,4 +74,40 @@ public class T72_239_MaxSlidingWindow {
} }
return result; return result;
} }
//使用一个单调栈(单调递减)记录当前的数:每次先排除过期的数
public int[] maxSlidingWindow1(int[] nums, int k) {
ArrayDeque<Integer> stack = new ArrayDeque<>();//记录的是index
int[] result = new int[nums.length - k + 1];
//先构造第一个窗口
for (int i = 0; i < k; i++) {
while (!stack.isEmpty() && nums[stack.peek()] <= nums[i]) {
stack.pop();
}
stack.addFirst(i);
}
result[0] = nums[stack.peekLast()];
//往后每移动一位result也会增加一个赋值
for (int i = k; i < nums.length; i++) {
//先排除过期元素
while (!stack.isEmpty() && i - stack.peekLast() >= k) {
stack.pollLast();
}
//将当前元素加到合适位置
while (!stack.isEmpty() && nums[stack.peek()] <= nums[i]) {
stack.pop();
}
stack.addFirst(i);
//判断当前位置应该添加那个元素
result[i - k + 1] = nums[stack.peekLast()];
}
return result;
}
} }

View File

@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>service</artifactId>
<groupId>com.cqu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>service_data_interface</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<!--clickhouse连接器-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.55</version>
</dependency>
<!--动态数据源-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<!-- <version>3.3.2</version>-->
</dependency>
</dependencies>
<!-- 项目打包时会将java目录中的*.xml文件也进行打包 -->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.yml</include>
<include>**/*.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>

View File

@ -0,0 +1,25 @@
package com.cqu.dataInterface;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.dataInterface
*@Author: markilue
*@CreateTime: 2023-05-27 20:24
*@Description: TODO 数据接口微服务启动类
*@Version: 1.0
*/
@SpringBootApplication
@EnableFeignClients
@MapperScan(basePackages = {"com.cqu.dataInterface"})
public class DataInterfaceApplication {
public static void main(String[] args) {
SpringApplication.run(DataInterfaceApplication.class);
}
}

View File

@ -0,0 +1,47 @@
package com.cqu.dataInterface.controller;
import com.cqu.dataInterface.entity.WindSCADALocationState;
import com.cqu.dataInterface.service.WindSCADAService;
import com.cqu.utils.Result;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.dataInterface.controller
*@Author: markilue
*@CreateTime: 2023-05-27 20:36
*@Description: TODO 风电SCADA数据的Controller层
*@Version: 1.0
*/
@RestController
@RequestMapping("api/dataInterface/windSCADA")
public class WindSCADAController {
@Autowired
private WindSCADAService windSCADAService;
@RequestMapping("/windSCADALocationState")
public Result getWindSCADALocationState(@RequestParam(value = "date", defaultValue = "0") String date, @RequestParam(value = "limit", defaultValue = "5") Integer limit) {
if ("0".equals(date)) {
date = getCurrentDate();
}
List<WindSCADALocationState> locationStateList = windSCADAService.getWindSCADALocationStateByTm(date, limit);
return Result.ok().data("data", locationStateList);
}
public String getCurrentDate() {
return DateFormatUtils.format(new Date(), "yyyyMMdd");
}
}

View File

@ -0,0 +1,49 @@
package com.cqu.dataInterface.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.dataInterface.entity
*@Author: markilue
*@CreateTime: 2023-05-27 20:34
*@Description: TODO 风电按照地区聚合实体类
*@Version: 1.0
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WindSCADALocationState {
//TODO 注意:这些属性名和类型必须和建表的时候完全对应上,ts字段为Long类型BigInt ->Long
private String stt;
private String edt;
private String windfarm;
private String location;
private String longitude;
private String latitude;
private Double production;
private Double avg_torque;
private Double avg_sumptemp;
private Double avg_inletoiltemp;
private Double avg_winddirection;
private Double avg_envtemp;
private Double avg_gen_speed;
private Double avg_pumpoutletpress;
private Double avg_engineroom_temp;
private Double avg_rotorspeed;
private Double avg_activepower;
private Double avg_engineroom_vibration_x;
private Double avg_engineroom_vibration_y;
private Double avg_highspeedshaft_front_temp;
private Double avg_max_windingtemp;
private Double avg_highspeedshaft_rear_temp;
private Double avg_windspeed;
private Double avg_coolingwatertemp;
private Double avg_inletpress;
private Long ts;
}

View File

@ -0,0 +1,29 @@
package com.cqu.dataInterface.mapper;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.cqu.dataInterface.entity.WindSCADALocationState;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.dataInterface.mapper
*@Author: markilue
*@CreateTime: 2023-05-27 20:42
*@Description: TODO windSCADA数据的Mapper
*@Version: 1.0
*/
@DS("clickhouse") //本个mapper默认的数据源
public interface WindSCADAMapper {
@Select("select * " +
" " +
"from wind_SCADA_type_state " +
"where " +
" toYYYYMMDD(ts) = ${tm} " +
"and limit ${limit}")
@DS("clickhouse")
List<WindSCADALocationState> selectWindSCADALocationStateByTm(@Param("tm") String tm,@Param("limit") Integer limit);//通过tm查询风电SCADA数据根据地区聚合之后的数据
}

View File

@ -0,0 +1,7 @@
-- 按照tm查询风电SCADA数据按照地区聚合之后的结果
select *
from wind_SCADA_type_state
where
toYYYYMMDD(ts) =tm
and limit 5

View File

@ -0,0 +1,20 @@
package com.cqu.dataInterface.service;
import com.cqu.dataInterface.entity.WindSCADALocationState;
import org.springframework.stereotype.Service;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.dataInterface.service
*@Author: markilue
*@CreateTime: 2023-05-27 20:42
*@Description: TODO 风电SCADA数据的服务
*@Version: 1.0
*/
public interface WindSCADAService {
List<WindSCADALocationState> getWindSCADALocationStateByTm(String tm,Integer limit);
}

View File

@ -0,0 +1,30 @@
package com.cqu.dataInterface.service.impl;
import com.cqu.dataInterface.entity.WindSCADALocationState;
import com.cqu.dataInterface.mapper.WindSCADAMapper;
import com.cqu.dataInterface.service.WindSCADAService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.dataInterface.service.impl
*@Author: markilue
*@CreateTime: 2023-05-27 21:04
*@Description: TODO 风电SCADA数据的服务实现类
*@Version: 1.0
*/
@Service
public class WindSCADAServiceImpl implements WindSCADAService {
@Autowired
private WindSCADAMapper windSCADAMapper;
@Override
public List<WindSCADALocationState> getWindSCADALocationStateByTm(String tm, Integer limit) {
return windSCADAMapper.selectWindSCADALocationStateByTm(tm, limit);
}
}

View File

@ -195,6 +195,18 @@
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
<!--计算对象大小的工具类-->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies> </dependencies>

View File

@ -23,13 +23,13 @@ public abstract class BaseStreamApp {
env.setParallelism(4); env.setParallelism(4);
//TODO 2.检查点设置 //TODO 2.检查点设置
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(50000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(5000L); env.getCheckpointConfig().setCheckpointTimeout(50000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
env.setStateBackend(new FsStateBackend(PHMConfig.RT_CHECKPOINT_LOCATION)); env.setStateBackend(new FsStateBackend(PHMConfig.RT_CHECKPOINT_LOCATION));
System.setProperty("HADOOP_USER_NAME", PHMConfig.HADOOP_USER_NAME); System.setProperty("HADOOP_USER_NAME", PHMConfig.HADOOP_USER_NAME);
System.setProperty("java.vm.name","Java HotSpot(TM) ");//使用的JDK版本 // System.setProperty("java.vm.name","Java HotSpot(TM) ");//使用的JDK版本
//模板 //模板
execute(env); execute(env);
env.execute(); env.execute();

View File

@ -69,7 +69,7 @@ public class GasWideApp extends BaseStreamApp {
String[] message = tagArray[0].split("\\."); String[] message = tagArray[0].split("\\.");
jsonObject.put("area", message[0].substring(1)); jsonObject.put("area", message[0].substring(1));
jsonObject.put("company", message[1]); jsonObject.put("company", message[1]);
jsonObject.put("typeId", message[2]); jsonObject.put("type_id", message[2]);
jsonObject.put("gt_no", message[3]); jsonObject.put("gt_no", message[3]);
return jsonObject; return jsonObject;
} }
@ -142,7 +142,7 @@ public class GasWideApp extends BaseStreamApp {
new DimAsyncFunction<JSONObject>("DIM_GAS_TYPE_INFO") { new DimAsyncFunction<JSONObject>("DIM_GAS_TYPE_INFO") {
@Override @Override
public String getKey(JSONObject inObj) { public String getKey(JSONObject inObj) {
return inObj.getString("typeId"); return inObj.getString("type_id");
} }
@Override @Override

View File

@ -7,23 +7,17 @@ import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.func.DataSamplingFunction; import com.cqu.warehouse.realtime.app.func.DataSamplingFunction;
import com.cqu.warehouse.realtime.app.func.FFTSamplingFunction; import com.cqu.warehouse.realtime.app.func.FFTSamplingFunction;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils; import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import jdk.nashorn.internal.ir.debug.ObjectSizeCalculator;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
@ -61,7 +55,7 @@ public class WindCMSApp extends BaseStreamApp {
//TODO 2.转为jsonObject //TODO 2.转为jsonObject
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSON::parseObject); SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSON::parseObject);
// jsonObjectDS.print("***"); // jsonObjectDS.print("first:");
//TODO 3.异步操作-进行数据降维 - 不知道数据降维可能花费的时间,操作异步操作实现 //TODO 3.异步操作-进行数据降维 - 不知道数据降维可能花费的时间,操作异步操作实现
@ -93,36 +87,36 @@ public class WindCMSApp extends BaseStreamApp {
}, },
60, TimeUnit.SECONDS 60, TimeUnit.SECONDS
); );
// downSamplingDataDS.print(">>>"); // downSamplingTenDS.print(">>>");
//3.2 降采样100倍的 //3.2 降采样100倍的
SingleOutputStreamOperator<JSONObject> downSamplingHundredDS = AsyncDataStream.unorderedWait( // SingleOutputStreamOperator<JSONObject> downSamplingHundredDS = AsyncDataStream.unorderedWait(
downSamplingTenDS, // downSamplingTenDS,
new DataSamplingFunction<JSONObject>() { // new DataSamplingFunction<JSONObject>() {
@Override // @Override
public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) { // public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) {
obj.put("x", downSampleData); // obj.put("x", downSampleData);
} // }
//
@Override // @Override
public int getFreq(JSONObject obj) { // public int getFreq(JSONObject obj) {
return Integer.parseInt(obj.getString("freq")); // return Integer.parseInt(obj.getString("freq"));
} // }
//
@Override // @Override
public int getThreshold(JSONObject obj) { // public int getThreshold(JSONObject obj) {
return obj.getJSONArray("x").size() / 10; // return obj.getJSONArray("x").size() / 10;
} // }
//
@Override // @Override
public List<Double> getDownSamplingData(JSONObject obj) { // public List<Double> getDownSamplingData(JSONObject obj) {
JSONArray x = obj.getJSONArray("x"); // JSONArray x = obj.getJSONArray("x");
return x.toJavaList(Double.class); // return x.toJavaList(Double.class);
} // }
}, // },
60, TimeUnit.SECONDS // 60, TimeUnit.SECONDS
); // );
//3.3 降采样1000倍的 //3.3 降采样1000倍的
// SingleOutputStreamOperator<JSONObject> downSamplingThousandDS = AsyncDataStream.unorderedWait( // SingleOutputStreamOperator<JSONObject> downSamplingThousandDS = AsyncDataStream.unorderedWait(
@ -192,66 +186,37 @@ public class WindCMSApp extends BaseStreamApp {
} }
} }
); );
// keyedDS.print("keyed:");
//4.4 开窗统计 //4.4 开窗统计+fft
OutputTag<String> fftTag = new OutputTag<String>("fft_tag") {
};
SingleOutputStreamOperator<JSONObject> mergeDS = keyedDS SingleOutputStreamOperator<JSONObject> mergeDS = keyedDS
.window(TumblingEventTimeWindows.of(Time.seconds(1))) .window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process( .process(
new ProcessWindowFunction<JSONObject, JSONObject, Tuple2<String, String>, TimeWindow>() { new FFTSamplingFunction(fftTag)
//窗口结束时触发
@Override
public void process(Tuple2<String, String> stringStringTuple2, ProcessWindowFunction<JSONObject, JSONObject, Tuple2<String, String>, TimeWindow>.Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception {
JSONObject obj = null;
//参考:https://zhuanlan.zhihu.com/p/534376156
//经过测试:1000个点一发:iterable500M
//10000个点一发:iterable370M
System.out.println("iterable_data_size:" + ObjectSizeCalculator.getObjectSize(iterable));
for (JSONObject jsonObject : iterable) {
if (obj == null) {
obj = jsonObject;
} else {
JSONArray x = obj.getJSONArray("x");
x.addAll(jsonObject.getJSONArray("x"));
}
}
collector.collect(obj);
}
}
); );
//TODO 6.计算频谱 --不知道要计算多久,使用异步Stream进行计算 //TODO 6.计算频谱 --不知道要计算多久,使用异步Stream进行计算
SingleOutputStreamOperator<JSONObject> fftDS = AsyncDataStream.unorderedWait(
mergeDS,
new FFTSamplingFunction<JSONObject>() {
@Override
protected void setTransformData(JSONObject jsonObject, List<Double> fft) {
jsonObject.put("x", fft);
}
@Override
protected List<Double> getTransformData(JSONObject jsonObject) {
return jsonObject.getJSONArray("x").toJavaList(Double.class);
}
},
60, TimeUnit.SECONDS
);
//TODO 合并的结果大概有10000个数据:判断是否合理 //TODO 合并的结果大概有10000个数据:判断是否合理
mergeDS.print(">>>"); mergeDS.print("merge:");
// fftDS.print(">>>"); DataStream<String> fftDS = mergeDS.getSideOutput(fftTag);
fftDS.print("fft:");
//TODO 5.写回kafka //TODO 5.写回kafka
// mergeDS mergeDS
// .map(jsonObject -> jsonObject.toJSONString()) .map(jsonObject -> jsonObject.toJSONString())
// .addSink( .addSink(
// MyKafkaUtils.getKafkaSink("dwm_wind_cms_10_downsampling") MyKafkaUtils.getKafkaSink("dwm_wind_cms_10_downsampling")
// ); );
// fftDS fftDS
// .map(jsonObject -> jsonObject.toJSONString()) .addSink(
// .addSink( MyKafkaUtils.getKafkaSink("dwm_wind_cms_10_frequency")
// MyKafkaUtils.getKafkaSink("dwm_wind_cms_10_frequency") );
// );
} }
} }

View File

@ -0,0 +1,260 @@
package com.cqu.warehouse.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.func.DataSamplingFunction;
import com.cqu.warehouse.realtime.app.func.FFTSamplingAsyncFunction;
import com.cqu.warehouse.realtime.app.func.FFTSamplingFunction;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.lucene.util.RamUsageEstimator;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dwm
*@Author: markilue
*@CreateTime: 2023-05-22 19:55
*@Description:
* TODO 风电cms数据DWM层: 异步(已无问题)
* 1)同一时刻数据拼接 -状态编程? 每一个时刻10万个点,全都放在内存当中不太合理? ->降维后的数据再进行拼接
* 2)lqtt算法提取数据的边界形状
* 3)考虑是否需要做多层的情况
*@Version: 1.0
*/
public class WindCMSAsyncApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new WindCMSAsyncApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.从kafka中读取数据
String topic = "dwd_wind_cms";
String groupId = "dwm_wind_cms_app_group";
DataStreamSource<String> kafkaDS = env.addSource(
MyKafkaUtils.getKafkaSource(topic, groupId)
);
//TODO 2.转为jsonObject
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSON::parseObject);
//TODO 3.异步操作-进行数据降维 - 不知道数据降维可能花费的时间,操作异步操作实现
//3.1 降采样10倍的
SingleOutputStreamOperator<JSONObject> downSamplingTenDS = AsyncDataStream.unorderedWait(
jsonObjectDS,
new DataSamplingFunction<JSONObject>() {
@Override
public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) {
obj.put("x", downSampleData);
}
@Override
public int getFreq(JSONObject obj) {
return Integer.parseInt(obj.getString("freq"));
}
@Override
public int getThreshold(JSONObject obj) {
return obj.getJSONArray("x").size() / 10;
}
@Override
public List<Double> getDownSamplingData(JSONObject obj) {
JSONArray x = obj.getJSONArray("x");
return x.toJavaList(Double.class);
}
},
60, TimeUnit.SECONDS
);
// downSamplingTenDS.print(">>>");
//3.2 降采样100倍的
// SingleOutputStreamOperator<JSONObject> downSamplingHundredDS = AsyncDataStream.unorderedWait(
// downSamplingTenDS,
// new DataSamplingFunction<JSONObject>() {
// @Override
// public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) {
// obj.put("x", downSampleData);
// }
//
// @Override
// public int getFreq(JSONObject obj) {
// return Integer.parseInt(obj.getString("freq"));
// }
//
// @Override
// public int getThreshold(JSONObject obj) {
// return obj.getJSONArray("x").size() / 10;
// }
//
// @Override
// public List<Double> getDownSamplingData(JSONObject obj) {
// JSONArray x = obj.getJSONArray("x");
// return x.toJavaList(Double.class);
// }
// },
// 60, TimeUnit.SECONDS
// );
//3.3 降采样1000倍的
// SingleOutputStreamOperator<JSONObject> downSamplingThousandDS = AsyncDataStream.unorderedWait(
// downSamplingHundredDS,
// new DataSamplingFunction<JSONObject>() {
// @Override
// public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) {
// obj.put("x", downSampleData);
// }
//
// @Override
// public int getFreq(JSONObject obj) {
// return Integer.parseInt(obj.getString("freq"));
// }
//
// @Override
// public int getThreshold(JSONObject obj) {
// return obj.getJSONArray("x").size() / 10;
// }
//
// @Override
// public List<Double> getDownSamplingData(JSONObject obj) {
// JSONArray x = obj.getJSONArray("x");
// return x.toJavaList(Double.class);
// }
// },
// 60, TimeUnit.SECONDS
// );
//TODO 4.合并同一时间段的CMS(降采样10倍的)
//4.1 添加上时间戳字段
SingleOutputStreamOperator<JSONObject> tsDS = downSamplingTenDS.map(
new RichMapFunction<JSONObject, JSONObject>() {
SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyyMMddHHmmss");
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
String realtime = jsonObject.getString("realtime");
if (realtime.length() < 8) jsonObject.put("ts", new Date().getTime());
else jsonObject.put("ts", sdf.parse(realtime).getTime());
return jsonObject;
}
}
);
//4.2 注册水位线
SingleOutputStreamOperator<JSONObject> dataWithWatermarkDS = tsDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("ts");
}
})
);
//4.3 根据风场和风机号分组
KeyedStream<JSONObject, Tuple2<String, String>> keyedDS = dataWithWatermarkDS.keyBy(
new KeySelector<JSONObject, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(JSONObject jsonObject) throws Exception {
return Tuple2.of(jsonObject.getString("windfarm"), jsonObject.getString("wt_no"));
}
}
);
// keyedDS.print("keyed:");
//4.4 开窗统计+fft
OutputTag<String> fftTag = new OutputTag<String>("fft_tag") {
};
SingleOutputStreamOperator<JSONObject> mergeDS = keyedDS
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(
new ProcessWindowFunction<JSONObject, JSONObject, Tuple2<String, String>, TimeWindow>() {
@Override
public void process(Tuple2<String, String> key, ProcessWindowFunction<JSONObject, JSONObject, Tuple2<String, String>, TimeWindow>.Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception {
JSONObject obj = null;
long start = context.window().getStart();
long end = context.window().getEnd();
System.out.println("key:" + key + " start:" + new Date(start) + " end:" + new Date(end) + " size: " + RamUsageEstimator.humanSizeOf(iterable));
//参考:https://zhuanlan.zhihu.com/p/534376156
//经过测试:1000个点一发:iterable500M
//10000个点一发:iterable370M
for (JSONObject eachObj : iterable) {
if (obj == null) {
obj = eachObj;
} else {
obj.getJSONArray("x").addAll(eachObj.getJSONArray("x"));
}
}
System.out.println("mergeSize:" + RamUsageEstimator.humanSizeOf(obj));
collector.collect(obj);
}
}
);
//TODO 6.计算频谱 --不知道要计算多久,使用异步Stream进行计算
SingleOutputStreamOperator<JSONObject> fftDS = AsyncDataStream.unorderedWait(
mergeDS,
new FFTSamplingAsyncFunction<JSONObject>() {
@Override
protected void setTransformData(JSONObject obj, List<Double> fft) {
obj.put("x", fft);
}
@Override
protected List<Double> getTransformData(JSONObject obj) {
return obj.getJSONArray("x").toJavaList(Double.class);
}
},
60,TimeUnit.SECONDS
);
//TODO 合并的结果大概有10000个数据:判断是否合理
mergeDS.print("merge:");
fftDS.print("fft:");
//TODO 5.写回kafka
mergeDS
.map(jsonObject -> jsonObject.toJSONString())
.addSink(
MyKafkaUtils.getKafkaSink("dwm_wind_cms_10_downsampling")
);
fftDS
.map(jsonObject -> jsonObject.toJSONString())
.addSink(
MyKafkaUtils.getKafkaSink("dwm_wind_cms_10_frequency")
);
}
}

View File

@ -103,6 +103,8 @@ public class WindScadaWideApp extends BaseStreamApp {
public void join(JSONObject inObj, JSONObject joinObj) throws Exception { public void join(JSONObject inObj, JSONObject joinObj) throws Exception {
inObj.put("location", joinObj.getString("REAL_NAME")); inObj.put("location", joinObj.getString("REAL_NAME"));
inObj.put("production", joinObj.getString("PRODUCTION")); inObj.put("production", joinObj.getString("PRODUCTION"));
inObj.put("longitude",joinObj.getString("LONGITUDE"));
inObj.put("latitude",joinObj.getString("LATITUDE"));
} }
}, },
60, TimeUnit.SECONDS 60, TimeUnit.SECONDS
@ -111,7 +113,7 @@ public class WindScadaWideApp extends BaseStreamApp {
SingleOutputStreamOperator<String> windSCADAWideDS = windSCADAWideJSONDS.map(jsonObject -> jsonObject.toJSONString()); SingleOutputStreamOperator<String> windSCADAWideDS = windSCADAWideJSONDS.map(jsonObject -> jsonObject.toJSONString());
windSCADAWideDS.print(">>>>"); windSCADAWideDS.print(">>>>");
windSCADAWideDS.addSink( windSCADAWideDS.addSink(
MyKafkaUtils.getKafkaSink("dwd_wind_wide") MyKafkaUtils.getKafkaSink("dwm_wind_wide")
); );
} }

View File

@ -0,0 +1,105 @@
package com.cqu.warehouse.realtime.app.dws;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.dwm.GasWideApp;
import com.cqu.warehouse.realtime.entity.GasTypeState;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dws
*@Author: markilue
*@CreateTime: 2023-05-26 16:58
*@Description: TODO 燃机按类型聚合:
*@Version: 1.0
*/
public class GasTypeStateApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new GasTypeStateApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.构建动态表 流处理环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//TODO 2.创建动态表 对接kafka
String topic = "dwm_gas_wide";
String groupId = "gas_type_state_app";
String sql = "create table gas_scada_type " +
"( " +
" type_id String, " +
" rated_temp BIGINT, " +
" rated_press BIGINT, " +
" rated_flow_rate BIGINT, " +
" rated_speed BIGINT, " +
" rated_power BIGINT, " +
" rated_load BIGINT, " +
" rated_duration BIGINT, " +
" LubeReturnT2 DOUBLE, " +
" T5TC1 DOUBLE, " +
" GFFlow DOUBLE, " +
" LFFlow DOUBLE, " +
" NHP_1 DOUBLE, " +
" AirInletDP_1 DOUBLE, " +
" T1_1 DOUBLE, " +
" LubeHaderP DOUBLE, " +
" LubeFilterDP DOUBLE, " +
" TankT DOUBLE, " +
" GrBxAccel DOUBLE, " +
" realtime STRING, " +
" row_time as TO_TIMESTAMP(realtime), " +
" WATERMARK FOR row_time as row_time - INTERVAL '3' SECOND " +
") " +
"with(" + MyKafkaUtils.getKafkaSourceByDDL(topic, groupId) + ")";
tableEnv.executeSql(sql);
//TODO 3.分组开窗聚合
String selectSQL = "select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt, " +
" DATE_FORMAT(TUMBLE_END(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:mm:ss') as edt, " +
" type_id, " +
" rated_temp, " +
" rated_press, " +
" rated_flow_rate, " +
" rated_speed, " +
" rated_power, " +
" rated_load, " +
" rated_duration, " +
" AVG(LubeReturnT2) as avg_LubeReturnT2, " +
" AVG(T5TC1) as avg_T5TC1, " +
" AVG(GFFlow) as avg_GFFlow, " +
" AVG(LFFlow) as avg_LFFlow, " +
" AVG(NHP_1) as avg_NHP_1, " +
" AVG(AirInletDP_1) as avg_AirInletDP_1, " +
" AVG(T1_1) as avg_T1_1, " +
" AVG(LubeHaderP) as avg_LubeHaderP, " +
" AVG(LubeFilterDP) as avg_LubeFilterDP, " +
" AVG(TankT) as avg_TankT, " +
" AVG(GrBxAccel) as avg_GrBxAccel " +
"from gas_scada_type " +
"group by TUMBLE(row_time, INTERVAL '3' MINUTE), " +
" type_id, " +
" rated_temp, " +
" rated_press, " +
" rated_flow_rate, " +
" rated_speed, " +
" rated_power, " +
" rated_load, " +
" rated_duration ";
Table table = tableEnv.sqlQuery(selectSQL);
DataStream<GasTypeState> windowDS = tableEnv.toAppendStream(table, GasTypeState.class);
windowDS.print(">>>");
}
}

View File

@ -0,0 +1,118 @@
package com.cqu.warehouse.realtime.app.dws;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.entity.WindSCADALocationState;
import com.cqu.warehouse.realtime.utils.ClickhouseUtils;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dws
*@Author: markilue
*@CreateTime: 2023-05-25 20:17
*@Description:
* TODO 风电SCADA数据的按地区聚合的DWSApp
* 使用FlinkSQL实现
*@Version: 1.0
*/
public class WindSCADALocationStateApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new WindSCADALocationStateApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.创建动态表
EnvironmentSettings setting = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);
//TODO 2.从kafka中读取数据 创建对应的动态表
String topic = "dwm_wind_wide";
String groupId = "wind_scada_type_state_app";
String sql = "CREATE TABLE wind_scada_location " +
"( " +
" windfarm STRING, " +
" location STRING, " +
" longitude STRING, " +
" latitude STRING, " +
" production DOUBLE, " +
" num_gen_torque DOUBLE, " +
" num_gearbox_sumptemp DOUBLE, " +
" num_gearbox_inletoiltemp DOUBLE, " +
" degree_winddirection DOUBLE, " +
" num_envtemp DOUBLE, " +
" num_gen_speed DOUBLE, " +
" num_gearbox_pumpoutletpress DOUBLE, " +
" num_engineroom_temp DOUBLE, " +
" num_rotorspeed DOUBLE, " +
" num_activepower DOUBLE, " +
" num_engineroom_vibration_x DOUBLE, " +
" num_engineroom_vibration_y DOUBLE, " +
" num_gearbox_highspeedshaft_front_temp DOUBLE, " +
" num_gen_max_windingtemp DOUBLE, " +
" num_gearbox_highspeedshaft_rear_temp DOUBLE, " +
" num_windspeed DOUBLE, " +
" num_gearbox_coolingwatertemp DOUBLE, " +
" num_gearbox_inletpress DOUBLE, " +
" realtime STRING, " +
" row_time as TO_TIMESTAMP(realtime), " +
" WATERMARK FOR row_time as row_time - INTERVAL '1' MINUTE " +
")" +
"WITH (" + MyKafkaUtils.getKafkaSourceByDDL(topic, groupId) + ")";
System.out.println("createSQL:"+sql);
tableEnv.executeSql(sql);
//TODO 3.开窗聚合计算
String selectSQL = "select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '10' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt, " +
" DATE_FORMAT(TUMBLE_END(row_time, INTERVAL '10' MINUTE), 'yyyy-MM-dd HH:mm:ss') as edt, " +
" windfarm, " +
" location, " +
" longitude, " +
" latitude, " +
" production, " +
" AVG(num_gen_torque) avg_torque, " +
" AVG(num_gearbox_sumptemp) avg_sumptemp, " +
" AVG(num_gearbox_inletoiltemp) avg_inletoiltemp, " +
" AVG(degree_winddirection) avg_winddirection, " +
" AVG(num_envtemp) avg_envtemp, " +
" AVG(num_gen_speed) avg_gen_speed, " +
" AVG(num_gearbox_pumpoutletpress) avg_pumpoutletpress, " +
" AVG(num_engineroom_temp) avg_engineroom_temp, " +
" AVG(num_rotorspeed) avg_rotorspeed, " +
" AVG(num_activepower) avg_activepower, " +
" AVG(num_engineroom_vibration_x) avg_engineroom_vibration_x, " +
" AVG(num_engineroom_vibration_y) avg_engineroom_vibration_y, " +
" AVG(num_gearbox_highspeedshaft_front_temp) avg_highspeedshaft_front_temp, " +
" AVG(num_gen_max_windingtemp) avg_max_windingtemp, " +
" AVG(num_gearbox_highspeedshaft_rear_temp) avg_highspeedshaft_rear_temp, " +
" AVG(num_windspeed) avg_windspeed, " +
" AVG(num_gearbox_coolingwatertemp) avg_coolingwatertemp, " +
" AVG(num_gearbox_inletpress) avg_inletpress, " +
" UNIX_TIMESTAMP() * 1000 ts " +
"from wind_scada_location " +
"group by " +
" TUMBLE(row_time, INTERVAL '10' MINUTE), " +
" windfarm, " +
" location, " +
" longitude, " +
" latitude, " +
" production";
Table table = tableEnv.sqlQuery(selectSQL);
DataStream<WindSCADALocationState> windowDS = tableEnv.toAppendStream(table, WindSCADALocationState.class);
windowDS.print(">>>");
windowDS.addSink(
ClickhouseUtils.getJDBCSink("insert into wind_SCADA_location_state values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
);
}
}

View File

@ -0,0 +1,121 @@
package com.cqu.warehouse.realtime.app.dws;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.entity.WindSCADATypeState;
import com.cqu.warehouse.realtime.utils.ClickhouseUtils;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dws
*@Author: markilue
*@CreateTime: 2023-05-26 12:53
*@Description: TODO 风电SCADA数据按照类型进行聚合
*@Version: 1.0
*/
public class WindSCADATypeStateApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new WindSCADATypeStateApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.创建表处理环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//TODO 2.创建动态表
String topic = "dwm_wind_wide";
String groupId = "wind_scada_typeState_app";
String sql = "create table wind_scada_type " +
"( " +
" type_id STRING, " +
" rated_efficiency BIGINT, " +
" rated_load BIGINT, " +
" rated_power BIGINT, " +
" rated_speed BIGINT, " +
" rated_press BIGINT, " +
" rated_air_volume BIGINT, " +
" num_gen_torque DOUBLE, " +
" num_gearbox_sumptemp DOUBLE, " +
" num_gearbox_inletoiltemp DOUBLE, " +
" degree_winddirection DOUBLE, " +
" num_envtemp DOUBLE, " +
" num_gen_speed DOUBLE, " +
" num_gearbox_pumpoutletpress DOUBLE, " +
" num_engineroom_temp DOUBLE, " +
" num_rotorspeed DOUBLE, " +
" num_activepower DOUBLE, " +
" num_engineroom_vibration_x DOUBLE, " +
" num_engineroom_vibration_y DOUBLE, " +
" num_gearbox_highspeedshaft_front_temp DOUBLE, " +
" num_gen_max_windingtemp DOUBLE, " +
" num_gearbox_highspeedshaft_rear_temp DOUBLE, " +
" num_windspeed DOUBLE, " +
" num_gearbox_coolingwatertemp DOUBLE, " +
" num_gearbox_inletpress DOUBLE, " +
" realtime STRING, " +
" row_time as TO_TIMESTAMP(realtime), " +
" WATERMARK for row_time as row_time - INTERVAL '1' MINUTE " +
")" +
" WITH (" + MyKafkaUtils.getKafkaSourceByDDL(topic, groupId) + " )";
tableEnv.executeSql(sql);
String selectSQL = "select DATE_FORMAT(TUMBLE_START(row_time , INTERVAL '10' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt, " +
" DATE_FORMAT(TUMBLE_END(row_time , INTERVAL '10' MINUTE), 'yyyy-MM-dd HH:mm:ss') as edt, " +
" type_id, " +
" rated_efficiency, " +
" rated_load, " +
" rated_power, " +
" rated_speed, " +
" rated_air_volume, " +
" AVG(num_gen_torque) avg_torque, " +
" AVG(num_gearbox_sumptemp) avg_sumptemp, " +
" AVG(num_gearbox_inletoiltemp) avg_inletoiltemp, " +
" AVG(degree_winddirection) avg_winddirection, " +
" AVG(num_envtemp) avg_envtemp, " +
" AVG(num_gen_speed) avg_gen_speed, " +
" AVG(num_gearbox_pumpoutletpress) avg_pumpoutletpress, " +
" AVG(num_engineroom_temp) avg_engineroom_temp, " +
" AVG(num_rotorspeed) avg_rotorspeed, " +
" AVG(num_activepower) avg_activepower, " +
" AVG(num_engineroom_vibration_x) avg_engineroom_vibration_x, " +
" AVG(num_engineroom_vibration_y) avg_engineroom_vibration_y, " +
" AVG(num_gearbox_highspeedshaft_front_temp) avg_highspeedshaft_front_temp, " +
" AVG(num_gen_max_windingtemp) avg_max_windingtemp, " +
" AVG(num_gearbox_highspeedshaft_rear_temp) avg_highspeedshaft_rear_temp, " +
" AVG(num_windspeed) avg_windspeed, " +
" AVG(num_gearbox_coolingwatertemp) avg_coolingwatertemp, " +
" AVG(num_gearbox_inletpress) avg_inletpress, " +
" UNIX_TIMESTAMP() * 1000 ts " +
"from wind_scada_type " +
"group by TUMBLE(row_time, INTERVAL '10' MINUTE), " +
" type_id, " +
" rated_efficiency, " +
" rated_load, " +
" rated_power, " +
" rated_speed," +
" rated_air_volume";
Table table = tableEnv.sqlQuery(selectSQL);
DataStream<WindSCADATypeState> windowDS = tableEnv.toAppendStream(table, WindSCADATypeState.class);
windowDS.print(">>>");
windowDS.addSink(
ClickhouseUtils.getJDBCSink(
"insert into wind_SCADA_type_state values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
)
);
}
}

View File

@ -0,0 +1,67 @@
-- 创建风电按地区聚合的clickhouse表
-- 顺序要保持一致
create table wind_SCADA_location_state
(
stt DateTime,
edt DateTime,
windfarm String,
location String,
longitude String,
latitude String,
production Decimal64(3),
avg_torque Decimal64(3),
avg_sumptemp Decimal64(3),
avg_inletoiltemp Decimal64(3),
avg_winddirection Decimal64(3),
avg_envtemp Decimal64(3),
avg_gen_speed Decimal64(3),
avg_pumpoutletpress Decimal64(3),
avg_engineroom_temp Decimal64(3),
avg_rotorspeed Decimal64(3),
avg_activepower Decimal64(3),
avg_engineroom_vibration_x Decimal64(3),
avg_engineroom_vibration_y Decimal64(3),
avg_highspeedshaft_front_temp Decimal64(3),
avg_max_windingtemp Decimal64(3),
avg_highspeedshaft_rear_temp Decimal64(3),
avg_windspeed Decimal64(3),
avg_coolingwatertemp Decimal64(3),
avg_inletpress Decimal64(3),
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,windfarm);
-- 创建风电按类型聚合的clickhouse表
create table wind_SCADA_type_state
(
stt DateTime,
edt DateTime,
type_id String,
rated_efficiency UInt64,
rated_load UInt64,
rated_power UInt64,
rated_speed UInt64,
rated_air_volume UInt64,
avg_torque Decimal64(3),
avg_sumptemp Decimal64(3),
avg_inletoiltemp Decimal64(3),
avg_winddirection Decimal64(3),
avg_envtemp Decimal64(3),
avg_gen_speed Decimal64(3),
avg_pumpoutletpress Decimal64(3),
avg_engineroom_temp Decimal64(3),
avg_rotorspeed Decimal64(3),
avg_activepower Decimal64(3),
avg_engineroom_vibration_x Decimal64(3),
avg_engineroom_vibration_y Decimal64(3),
avg_highspeedshaft_front_temp Decimal64(3),
avg_max_windingtemp Decimal64(3),
avg_highspeedshaft_rear_temp Decimal64(3),
avg_windspeed Decimal64(3),
avg_coolingwatertemp Decimal64(3),
avg_inletpress Decimal64(3),
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,type_id);

View File

@ -0,0 +1,220 @@
-- 创建一个风电按地区聚合宽表
-- watermark允许迟到多少
create table wind_scada_location
(
windfarm STRING,
location STRING,
longitude STRING,
latitude STRING,
production DOUBLE,
num_gen_torque DOUBLE,
num_gearbox_sumptemp DOUBLE,
num_gearbox_inletoiltemp DOUBLE,
degree_winddirection DOUBLE,
num_envtemp DOUBLE,
num_gen_speed DOUBLE,
num_gearbox_pumpoutletpress DOUBLE,
num_engineroom_temp DOUBLE,
num_rotorspeed DOUBLE,
num_activepower DOUBLE,
num_engineroom_vibration_x DOUBLE,
num_engineroom_vibration_y DOUBLE,
num_gearbox_highspeedshaft_front_temp DOUBLE,
num_gen_max_windingtemp DOUBLE,
num_gearbox_highspeedshaft_rear_temp DOUBLE,
num_windspeed DOUBLE,
num_gearbox_coolingwatertemp DOUBLE,
num_gearbox_inletpress DOUBLE,
realtime STRING,
row_time as TO_TIMESTAMP(realtime),
WATERMARK FOR row_time as row_time - INTERVAL '1' MINUTE
)
WITH (
'connector' = 'kafka',
'topic' = 'dwd_wind_wide',
'properties.bootstrap.servers' = 'Ding202:9092',
'properties.group.id' = 'wind_scada_location_state_app',
'scan.startup.mode' = 'latest-offset',
'format' = 'json')
-- 按地区分组开窗聚合宽表
select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt,
DATE_FORMAT(TUMBLE_END(row_time, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') as edt,
windfarm,
location,
longitude,
latitude,
production,
AVG(num_gen_torque) avg_torque,
AVG(num_gearbox_sumptemp) avg_sumptemp,
AVG(num_gearbox_inletoiltemp) avg_inletoiltemp,
AVG(degree_winddirection) avg_winddirection,
AVG(num_envtemp) avg_envtemp,
AVG(num_gen_speed) avg_gen_speed,
AVG(num_gearbox_pumpoutletpress) avg_pumpoutletpress,
AVG(num_engineroom_temp) avg_engineroom_temp,
AVG(num_rotorspeed) avg_rotorspeed,
AVG(num_activepower) avg_activepower,
AVG(num_engineroom_vibration_x) avg_engineroom_vibration_x,
AVG(num_engineroom_vibration_y) avg_engineroom_vibration_y,
AVG(num_gearbox_highspeedshaft_front_temp) avg_highspeedshaft_front_temp,
AVG(num_gen_max_windingtemp) avg_max_windingtemp,
AVG(num_gearbox_highspeedshaft_rear_temp) avg_highspeedshaft_rear_temp,
AVG(num_windspeed) avg_windspeed,
AVG(num_gearbox_coolingwatertemp) avg_coolingwatertemp,
AVG(num_gearbox_inletpress) avg_inletpress,
UNIX_TIMESTAMP() * 1000 ts
from wind_scada_wind
group by TUMBLE(row_time, INTERVAL '5' MINUTE),
windfarm,
location,
longitude,
latitude,
production
-- 创建一个风电按类型聚合宽表
create table wind_scada_type
(
type_id STRING,
rated_efficiency BIGINT,
rated_load BIGINT,
rated_power BIGINT,
rated_speed BIGINT,
rated_press BIGINT,
rated_air_volume BIGINT,
num_gen_torque DOUBLE,
num_gearbox_sumptemp DOUBLE,
num_gearbox_inletoiltemp DOUBLE,
degree_winddirection DOUBLE,
num_envtemp DOUBLE,
num_gen_speed DOUBLE,
num_gearbox_pumpoutletpress DOUBLE,
num_engineroom_temp DOUBLE,
num_rotorspeed DOUBLE,
num_activepower DOUBLE,
num_engineroom_vibration_x DOUBLE,
num_engineroom_vibration_y DOUBLE,
num_gearbox_highspeedshaft_front_temp DOUBLE,
num_gen_max_windingtemp DOUBLE,
num_gearbox_highspeedshaft_rear_temp DOUBLE,
num_windspeed DOUBLE,
num_gearbox_coolingwatertemp DOUBLE,
num_gearbox_inletpress DOUBLE,
realtime STRING,
row_time as TO_TIMESTAMP(realtime),
WATERMARK for row_time as row_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'dwd_wind_wide',
'properties.bootstrap.servers' = 'Ding202:9092',
'properties.group.id' = 'wind_scada_type_state_app',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
-- 按类型分组开窗聚合宽表
select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '10' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt,
DATE_FORMAT(TUMBLE_END(row_time, INTERVAL '10' MINUTE), 'yyyy-MM-dd HH:mm:ss') as edt,
type_id,
rated_efficiency,
rated_load,
rated_power,
rated_speed,
rated_air_volume,
AVG(num_gen_torque) avg_torque,
AVG(num_gearbox_sumptemp) avg_sumptemp,
AVG(num_gearbox_inletoiltemp) avg_inletoiltemp,
AVG(degree_winddirection) avg_winddirection,
AVG(num_envtemp) avg_envtemp,
AVG(num_gen_speed) avg_gen_speed,
AVG(num_gearbox_pumpoutletpress) avg_pumpoutletpress,
AVG(num_engineroom_temp) avg_engineroom_temp,
AVG(num_rotorspeed) avg_rotorspeed,
AVG(num_activepower) avg_activepower,
AVG(num_engineroom_vibration_x) avg_engineroom_vibration_x,
AVG(num_engineroom_vibration_y) avg_engineroom_vibration_y,
AVG(num_gearbox_highspeedshaft_front_temp) avg_highspeedshaft_front_temp,
AVG(num_gen_max_windingtemp) avg_max_windingtemp,
AVG(num_gearbox_highspeedshaft_rear_temp) avg_highspeedshaft_rear_temp,
AVG(num_windspeed) avg_windspeed,
AVG(num_gearbox_coolingwatertemp) avg_coolingwatertemp,
AVG(num_gearbox_inletpress) avg_inletpress,
UNIX_TIMESTAMP() * 1000 ts
from wind_scada_type
group by TUMBLE(row_time, INTERVAL '10' MINUTE),
type_id,
rated_efficiency,
rated_load,
rated_power,
rated_speed
-- 创建一个燃机按类型聚合宽表
create table gas_scada_type
(
type_id String,
rated_temp BIGINT,
rated_press BIGINT,
rated_flow_rate BIGINT,
rated_speed BIGINT,
rated_power BIGINT,
rated_load BIGINT,
rated_duration BIGINT,
LubeReturnT2 DOUBLE,
T5TC1 DOUBLE,
GFFlow DOUBLE,
LFFlow DOUBLE,
NHP_1 DOUBLE,
AirInletDP_1 DOUBLE,
T1_1 DOUBLE,
LubeHaderP DOUBLE,
LubeFilterDP DOUBLE,
TankT DOUBLE,
GrBxAccel DOUBLE,
realtime STRING,
row_time as TO_TIMESTAMP(realtime),
WATERMARK FOR row_time as row_time - INTERVAL '3' SECOND
)
with (
'connector' = 'kafka',
'topic' = 'dwm_gas_wide',
'properties.bootstrap.servers' = 'Ding202:9092',
'properties.group.id' = 'gas_type_state_app',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
-- 按类型分组开窗聚合 太多了,仅选取了部分进行聚合
select DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:mm:ss') as stt,
DATE_FORMAT(TUMBLE_END(row_time, INTERVAL '3' MINUTE), 'yyyy-MM-dd HH:mm:ss') as edt,
type_id,
rated_temp,
rated_press,
rated_flow_rate,
rated_speed,
rated_power,
rated_load,
rated_duration,
AVG(LubeReturnT2) as avg_LubeReturnT2,
AVG(T5TC1) as avg_T5TC1,
AVG(GFFlow) as avg_GFFlow,
AVG(LFFlow) as avg_LFFlow,
AVG(NHP_1) as avg_NHP_1,
AVG(AirInletDP_1) as avg_AirInletDP_1,
AVG(T1_1) as avg_T1_1,
AVG(LubeHaderP) as avg_LubeHaderP,
AVG(LubeFilterDP) as avg_LubeFilterDP,
AVG(TankT) as avg_TankT,
AVG(GrBxAccel) as avg_GrBxAccel
from gas_scada_type
group by TUMBLE(row_time, INTERVAL '3' MINUTE),
type_id,
rated_temp,
rated_press,
rated_flow_rate,
rated_speed,
rated_power,
rated_load,
rated_duration

View File

@ -1,29 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.cqu.warehouse.realtime.utils.DownSamplingUtils;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import java.util.Collections;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-23 16:21
*@Description: TODO 进行数据降维的异步Function
*@Version: 1.0
*/
public abstract class DataSamplingFunction<T> implements AsyncFunction<T, T>, DownSampleFunction<T> {
@Override
public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
List<Double> rawData = getDownSamplingData(obj);
int freq = getFreq(obj);
int threshold = getThreshold(obj);
List<Double> downSampleData = DownSamplingUtils.downSamplingData(rawData, freq, threshold);
setDownSamplingData(obj,downSampleData);
resultFuture.complete(Collections.singleton(obj));
}
}

View File

@ -1,74 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.utils.DimUtils;
import com.cqu.warehouse.realtime.utils.ThreadPoolUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-18 20:15
*@Description: TODO 维度关联方法
*@Version: 1.0
*/
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {
ExecutorService threadPool;
String tableName;
public DimAsyncFunction(String tableName) {
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
threadPool = ThreadPoolUtils.getThreadPool();
}
//异步调用主方法
@Override
public void asyncInvoke(T obj, ResultFuture<T> resultFuture) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
try {
long start = System.currentTimeMillis();
//选取指定字段
String key = getKey(obj);
//根据对应的id查询出需要关联的对象
JSONObject joinObj = DimUtils.getDimInfoWithCache(tableName, key);
//让当前对象与查询出的对象进行异步关联
if (joinObj != null) {
join(obj, joinObj);
}
// System.out.println(joinObj);
long end = System.currentTimeMillis();
System.out.println("维度异步查询耗时:" + (end - start) + "毫秒");
resultFuture.complete(Collections.singleton(obj));
} catch (Exception e) {
e.printStackTrace();
System.out.println("维度查询发生异常");
}
}
}
);
}
}

View File

@ -1,95 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.common.PHMConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-20 18:18
*@Description: TODO 燃机维度信息添加
*@Version: 1.0
*/
public class DimFunction extends RichMapFunction<JSONObject, JSONObject> {
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER);
connection.setSchema(PHMConfig.HBASE_SCHEMA);
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
String dimTable = "dim_" + jsonObject.getString("table");
String type = jsonObject.getString("type");
//注意:maxwell可以对历史数据进行处理,这时候type为bootstrap-insert,这时候需要修复
if (type.equals("bootstrap-insert")) {
type = "insert";
jsonObject.put("type", type);
}
JSONObject data = jsonObject.getJSONObject("data");
Set<String> colName = data.keySet();
//如果说 读取到的配置信息是维度数据的话那么提前在Hbase中创建维度表
if ("insert".equals(type)) {
//如果是维度数据且是insert;update就没必要在创建
existTable(dimTable, colName);
}
jsonObject.put("sinkTable", dimTable);
return jsonObject;
}
private void existTable(String dimTable, Set<String> colName) {
StringBuilder sql = new StringBuilder("create table if not exists " + dimTable + "(");
int i = 0;
for (String col : colName) {
if ("id".equals(col)) {
sql.append(col + " varchar primary key");
} else {
sql.append(col + " varchar");
}
if (i++ < colName.size() - 1) {
sql.append(",");
}
}
sql.append(")");
PreparedStatement preparedStatement = null;
String executeSQL = sql.toString();
System.out.println(executeSQL);
try {
preparedStatement = connection.prepareStatement(executeSQL);
preparedStatement.execute();
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (preparedStatement == null) {
try {
preparedStatement.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
}
}

View File

@ -1,21 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-22 14:40
*@Description:
* TODO 维度关联接口:定义好维度关联需要的方法
* 1)获取关联字段
* 2)进行维度关联
*@Version: 1.0
*/
public interface DimJoinFunction<T> {
public String getKey(T inObj);
public void join(T inObj, JSONObject joinObj) throws Exception;
}

View File

@ -1,83 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.common.PHMConfig;
import com.cqu.warehouse.realtime.utils.DimUtils;
import com.cqu.warehouse.realtime.utils.RedisUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-20 19:09
*@Description: TODO 维度数据表写回Hbase
*@Version: 1.0
*/
public class DimSink extends RichSinkFunction<JSONObject> {
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER);
connection.setSchema(PHMConfig.HBASE_SCHEMA);
}
@Override
public void invoke(JSONObject jsonObject, Context context) {
String sinkTable = jsonObject.getString("sinkTable");
JSONObject data = jsonObject.getJSONObject("data");
String sql = generateSQL(sinkTable, data);
String id = data.getString("id");
try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){
preparedStatement.executeUpdate();
//TODO 注意:提交事务(MQSQL默认自动提交事务,Phoenix默认是手动提交事务)
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("向phoenix中插入维度数据失败");
}
String type = jsonObject.getString("sinkTable");
if("update".equals(type)||"delete".equals(type)){
DimUtils.deleteCache(sinkTable,id);
}
}
public String generateSQL(String sinkTable, JSONObject data) {
List<String> col = new ArrayList<>();
List<Object> value = new ArrayList<>();
for (Map.Entry<String, Object> entry : data.entrySet()) {
col.add(entry.getKey());
value.add(entry.getValue());
}
String sql = "upsert into " + sinkTable + "(" + StringUtils.join(col, ",") + ") values ('" + StringUtils.join(value, "','") + "')";
System.out.println(sql);
return sql;
}
}

View File

@ -1,22 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-23 16:31
*@Description: TODO 降采样接口需要实现的方法
*@Version: 1.0
*/
public interface DownSampleFunction<T> {
void setDownSamplingData(T obj, List<Double> downSampleData);
int getFreq(T obj);
int getThreshold(T obj);
List<Double> getDownSamplingData(T obj);
}

View File

@ -1,34 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.cqu.warehouse.realtime.utils.TransformUtils;
import jdk.nashorn.internal.ir.debug.ObjectSizeCalculator;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import java.util.Collections;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-24 17:46
*@Description: TODO 计算数据的FFT
*@Version: 1.0
*/
public abstract class FFTSamplingFunction<T> implements AsyncFunction<T, T> {
@Override
public void asyncInvoke(T t, ResultFuture<T> resultFuture) throws Exception {
List<Double> transformData = getTransformData(t);
List<Double> fft = TransformUtils.fft(transformData);
setTransformData(t,fft);
System.out.println("fftDataSize:"+ObjectSizeCalculator.getObjectSize(t));
resultFuture.complete(Collections.singleton(t));
}
protected abstract void setTransformData(T t, List<Double> fft);
protected abstract List<Double> getTransformData(T t);
}

View File

@ -1,46 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.func
*@Author: markilue
*@CreateTime: 2023-05-18 17:26
*@Description: TODO 燃机的维度关联 大宽表的function
*@Version: 1.0
*/
public class GasWideProcessFunction extends ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow> {
@Override
public void process(String key, ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>.Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception {
//窗口结束触发方法,iterable里面存放窗口里的所有数据存放在内存中所以不适合太多
JSONObject json = null;
for (JSONObject jsonObject : iterable) {
String tagName = jsonObject.getString("tagName");
Double value = jsonObject.getDouble("value");
if (json == null) {
jsonObject.remove("tagName");
jsonObject.remove("value");
jsonObject.put(tagName, value);
json = jsonObject;
} else {
json.put(tagName, value);
}
}
long start = context.window().getStart();
long end = context.window().getEnd();
System.out.println("key:"+key+" start:"+new Date(start)+" end:"+new Date(end)+" size: "+json.size());
collector.collect(json);
}
}

View File

@ -1,57 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.func
*@Author: markilue
*@CreateTime: 2023-05-18 14:03
*@Description: TODO 自定义反序列化器
*@Version: 1.0
*/
public class MyDeserializerSchemaFunction implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
Struct value = (Struct) sourceRecord.value();
Struct source = value.getStruct("source");
Struct afterData = value.getStruct("after");
JSONObject jsonObject = new JSONObject();
jsonObject.put("database", source.getString("db"));
jsonObject.put("table", source.getString("table"));
//类型
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();//内部通过枚举类将op转为对应的string
if (type.equals("create")) {
type = "insert";
}
jsonObject.put("type", type);
JSONObject dataObject = new JSONObject();
if (afterData != null) {
for (Field field : afterData.schema().fields()) {
String fieldName = field.name();
Object fieldValue = afterData.get(fieldName);
dataObject.put(fieldName, fieldValue);
}
}
jsonObject.put("data", dataObject);
collector.collect(jsonObject.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}

View File

@ -1,90 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.entity.GasCodeProcess;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.func
*@Author: markilue
*@CreateTime: 2023-05-18 14:32
*@Description: TODO 数据脱敏function实现类
*@Version: 1.0
*/
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private MapStateDescriptor<String, GasCodeProcess> mapStateDescriptor;
private OutputTag<String> outTag;
private SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
public TableProcessFunction(OutputTag<String> outTag, MapStateDescriptor<String, GasCodeProcess> mapStateDescriptor) {
this.mapStateDescriptor = mapStateDescriptor;
this.outTag = outTag;
}
@Override
public void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {
ReadOnlyBroadcastState<String, GasCodeProcess> gasTableState = readOnlyContext.getBroadcastState(mapStateDescriptor);
String area = jsonObject.getString("area");
String tagName = jsonObject.getString("tagName");
GasCodeProcess truth1 = gasTableState.get(area);
if (truth1 != null) {
jsonObject.put("area", truth1.getTag_desc());
}
GasCodeProcess truth = gasTableState.get(tagName);
if (truth != null) {
// jsonObject.put("originaltagName", truth.getDecode_value());
jsonObject.put("tagName", truth.getDecode_value());
// jsonObject.put("tag_desc", truth.getTag_desc());
// jsonObject.put("unit", truth.getUnit());
// jsonObject.put("is_calculated", truth.getIs_calculated());
// readOnlyContext.output(outTag, jsonObject.toJSONString());
}
//添加上时间戳,方便后面注册水位线
String time = jsonObject.getString("realtime");
long ts = sdf.parse(time).getTime();
jsonObject.put("ts",ts);
collector.collect(jsonObject);
}
@Override
public void processBroadcastElement(String s, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {
BroadcastState<String, GasCodeProcess> gasTableProcessState = context.getBroadcastState(mapStateDescriptor);
JSONObject jsonObject = JSON.parseObject(s);
JSONObject data = jsonObject.getJSONObject("data");
GasCodeProcess gasTableProcess = JSON.parseObject(jsonObject.getString("data"), GasCodeProcess.class);
if (data != null) {
String encode_value = data.getString("encode_value");
if (encode_value != null) {
gasTableProcessState.put(encode_value, gasTableProcess);//<encode,GasTableProcess>
}
}
}
}

View File

@ -1,46 +0,0 @@
create table if not exists dim_gas_type_info
(
rated_duration
varchar,
rated_load
varchar,
rated_power
varchar,
rated_speed
varchar,
rated_flow_rate
varchar,
type_id
varchar,
rated_press
varchar,
rated_temp
varchar,
id
varchar
primary
key
)
upsert into dim_gas_type_info
(
rated_duration,
rated_load,
rated_power,
rated_speed,
rated_flow_rate,
type_id,
rated_press,
rated_temp,
id
) values
(
rated_duration,
rated_load,
rated_power,
rated_speed,
rated_flow_rate,
type_id,
rated_press,
rated_temp,
id
)

View File

@ -1,21 +0,0 @@
package com.cqu.warehouse.realtime.app.func;
import java.util.Date;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.func
*@Author: markilue
*@CreateTime: 2023-05-18 17:42
*@Description: TODO
*@Version: 1.0
*/
public class test {
public static void main(String[] args) {
Long ts = 1672502465000L;
Long ts1 =1672502405000L;
System.out.println(new Date(ts));
System.out.println(new Date(ts1));
}
}

View File

@ -12,7 +12,7 @@ public class PHMConfig {
public static final String HBASE_SCHEMA = "PHM_REALTIME";//Hbase库名 public static final String HBASE_SCHEMA = "PHM_REALTIME";//Hbase库名
public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接 public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//Phoenix连接 public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//clickhouse连接
public static final String RT_CHECKPOINT_LOCATION = "hdfs://Ding202:8020/phm_warehouse/rt/ck";//检查点保存位置 public static final String RT_CHECKPOINT_LOCATION = "hdfs://Ding202:8020/phm_warehouse/rt/ck";//检查点保存位置
public static final String HADOOP_USER_NAME = "dingjiawen";//检查点保存位置 public static final String HADOOP_USER_NAME = "dingjiawen";//检查点保存位置
} }

View File

@ -1,6 +1,8 @@
package com.cqu.warehouse.realtime.entity; package com.cqu.warehouse.realtime.entity;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
/** /**
*@BelongsProject: phm_parent *@BelongsProject: phm_parent
@ -11,6 +13,8 @@ import lombok.Data;
*@Version: 1.0 *@Version: 1.0
*/ */
@Data @Data
@AllArgsConstructor
@NoArgsConstructor
public class GasCodeProcess { public class GasCodeProcess {
String encode_value;//加密数据 String encode_value;//加密数据

View File

@ -0,0 +1,41 @@
package com.cqu.warehouse.realtime.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.entity
*@Author: markilue
*@CreateTime: 2023-05-26 17:39
*@Description: TODO 燃机按类型聚合后的数据
*@Version: 1.0
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class GasTypeState {
String stt;
String edt;
String type_id;
Long rated_temp;
Long rated_press;
Long rated_flow_rate;
Long rated_speed;
Long rated_power;
Long rated_load;
Long rated_duration;
Double avg_LubeReturnT2;
Double avg_T5TC1;
Double avg_GFFlow;
Double avg_LFFlow;
Double avg_NHP_1;
Double avg_AirInletDP_1;
Double avg_T1_1;
Double avg_LubeHaderP;
Double avg_LubeFilterDP;
Double avg_TankT;
Double avg_GrBxAccel;
}

View File

@ -0,0 +1,21 @@
package com.cqu.warehouse.realtime.entity;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-26 19:23
*@Description: TODO 注解:加在属性上则不sink到clickhouse中
*@Version: 1.0
*/
@Target(ElementType.FIELD)//注解加的位置
@Retention(RetentionPolicy.RUNTIME)//注解生效的时间
public @interface TransientSink {
}

View File

@ -0,0 +1,47 @@
package com.cqu.warehouse.realtime.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.entity
*@Author: markilue
*@CreateTime: 2023-05-25 21:39
*@Description: TODO 风电SCADA数据按照地区聚合后的对象
*@Version: 1.0
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WindSCADALocationState {
//TODO 注意:这些属性名和类型必须和建表的时候完全对应上,ts字段为Long类型BigInt ->Long
private String stt;
private String edt;
private String windfarm;
private String location;
private String longitude;
private String latitude;
private Double production;
private Double avg_torque;
private Double avg_sumptemp;
private Double avg_inletoiltemp;
private Double avg_winddirection;
private Double avg_envtemp;
private Double avg_gen_speed;
private Double avg_pumpoutletpress;
private Double avg_engineroom_temp;
private Double avg_rotorspeed;
private Double avg_activepower;
private Double avg_engineroom_vibration_x;
private Double avg_engineroom_vibration_y;
private Double avg_highspeedshaft_front_temp;
private Double avg_max_windingtemp;
private Double avg_highspeedshaft_rear_temp;
private Double avg_windspeed;
private Double avg_coolingwatertemp;
private Double avg_inletpress;
private Long ts;
}

View File

@ -0,0 +1,48 @@
package com.cqu.warehouse.realtime.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.entity
*@Author: markilue
*@CreateTime: 2023-05-25 21:39
*@Description: TODO 风电SCADA数据按照地区聚合后的对象
*@Version: 1.0
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WindSCADATypeState {
//TODO 注意:这些属性名和类型必须和建表的时候完全对应上,ts字段为Long类型BigInt ->Long
private String stt;
private String edt;
private String type_id;
private Long rated_efficiency;
private Long rated_load;
private Long rated_power;
private Long rated_speed;
private Long rated_air_volume;
private Double avg_torque;
private Double avg_sumptemp;
private Double avg_inletoiltemp;
private Double avg_winddirection;
private Double avg_envtemp;
private Double avg_gen_speed;
private Double avg_pumpoutletpress;
private Double avg_engineroom_temp;
private Double avg_rotorspeed;
private Double avg_activepower;
private Double avg_engineroom_vibration_x;
private Double avg_engineroom_vibration_y;
private Double avg_highspeedshaft_front_temp;
private Double avg_max_windingtemp;
private Double avg_highspeedshaft_rear_temp;
private Double avg_windspeed;
private Double avg_coolingwatertemp;
private Double avg_inletpress;
private Long ts;
}

View File

@ -0,0 +1,66 @@
package com.cqu.warehouse.realtime.utils;
import com.cqu.warehouse.realtime.common.PHMConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.beans.Transient;
import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-26 19:06
*@Description: TODO clickhouse的工具类
*@Version: 1.0
*/
public class ClickhouseUtils {
public static <T> SinkFunction<T> getJDBCSink(String sql) {
SinkFunction<T> clickhouseSink = JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
@Override
public void accept(PreparedStatement preparedStatement, T t) throws SQLException {
Field[] fields = t.getClass().getDeclaredFields();
int index = 1;
for (Field field : fields) {
Transient annotation = field.getAnnotation(Transient.class);
if (annotation != null) {
continue;//如果有这个注解了就跳过
}
field.setAccessible(true);
try {
Object fieldValue = field.get(t);
preparedStatement.setObject(index++, fieldValue);
} catch (IllegalAccessException e) {
e.printStackTrace();
System.out.println("sink失败");
}
}
}
},
new JdbcExecutionOptions.Builder()
// .withMaxRetries(3)
// .withBatchIntervalMs()
.withBatchSize(5)//每5次输出到clickhouse
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl(PHMConfig.CLICKHOUSE_URL)
.build()
);
return clickhouseSink;
}
}

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import scala.collection.parallel.immutable.ParRange;
import java.util.List; import java.util.List;
@ -73,7 +74,6 @@ public class DimUtils {
System.out.println("------关闭redis连接-------"); System.out.println("------关闭redis连接-------");
} }
return jsonObject; return jsonObject;
} }

View File

@ -45,10 +45,21 @@ public class MyKafkaUtils {
return new FlinkKafkaProducer<String>(topic, new KafkaSerializationSchema<String>() { return new FlinkKafkaProducer<String>(topic, new KafkaSerializationSchema<String>() {
@Override @Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) { public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return new ProducerRecord<>(topic,s.getBytes()); return new ProducerRecord<>(topic, s.getBytes());
} }
}, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
} }
public static String getKafkaSourceByDDL(String topic, String groupId) {
String connection =
" 'connector' = 'kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'json'";
return connection;
}
} }

View File

@ -40,7 +40,7 @@ public class PhoenixUtils {
//处理结果集 //处理结果集
while (resultSet.next()) { while (resultSet.next()) {
T obj = clazz.newInstance(); T obj = clazz.newInstance();
for (int i = 1; i < metaData.getColumnCount(); i++) {//从1开始 for (int i = 1; i <= metaData.getColumnCount(); i++) {//TODO 注意:从1开始,且小于等于
String columnName = metaData.getColumnName(i); String columnName = metaData.getColumnName(i);
BeanUtils.setProperty(obj, columnName, resultSet.getObject(i)); BeanUtils.setProperty(obj, columnName, resultSet.getObject(i));
} }