leecode,rt_phm更新更新

This commit is contained in:
markilue 2023-05-24 13:31:51 +08:00
parent f7bc35b4bc
commit 00f5d9ebd1
59 changed files with 4413606 additions and 15 deletions

View File

@ -0,0 +1,121 @@
package com.markilue.leecode.hot100.interviewHot.listnode;
import com.markilue.leecode.listnode.ListNode;
import com.markilue.leecode.listnode.ListNodeUtils;
import org.junit.Test;
import java.util.ArrayDeque;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.interviewHot.listnode
*@Author: markilue
*@CreateTime: 2023-05-24 11:51
*@Description:
* TODO 力扣234 回文链表:
* 1)快慢指针+回文链表
* 2)寻找循环不变量-类似重排链表
*@Version: 1.0
*/
public class LC_234_IsPalindrome {
@Test
public void test() {
ListNode root = new ListNode(1);
ListNode root1 = ListNodeUtils.build(new int[]{1, 2, 2, 1});
System.out.println(isPalindrome2(root1));
}
//TODO 方式2
ListNode root;
public boolean isPalindrome(ListNode head) {
root = head;
return palindrome(head);
}
public boolean palindrome(ListNode node) {
if (node == null) {
return true;
}
if (palindrome(node.next)) {
//这里可以优化:如果是回文还需要遍历全部;但是没必要,遍历到中间即可
if (root.val == node.val) {
root = root.next;
return true;
}
}
return false;
}
//TODO 方式1
public boolean isPalindrome1(ListNode head) {
if (head == null) return true;
ListNode mid1 = findMid(head);
ListNode mid2 = reverse(mid1.next);
ListNode p1 = head;
while (mid2 != null) {
if (p1.val != mid2.val) {
return false;
}
p1 = p1.next;
mid2 = mid2.next;
}
return true;
}
private ListNode findMid(ListNode head) {
ListNode fast = head;
ListNode slow = head;
//这个判断条件值得注意:不是fast==null&&fast.next!=null;否则会多走一步
while (fast.next != null && fast.next.next != null) {
fast = fast.next.next;
slow = slow.next;
}
return slow;
}
private ListNode reverse(ListNode start) {
ListNode fake = new ListNode();
ListNode cur = start;
ListNode curNext;
while (cur != null) {
curNext = cur.next;
cur.next = fake.next;
fake.next = cur;
cur = curNext;
}
return fake.next;
}
//TODO 官方最快:使用ArrayDeque装node
public boolean isPalindrome2(ListNode head) {
if (head == null || head.next == null) return true;
ArrayDeque<Integer> list = new ArrayDeque<>();
ListNode fast = head;
ListNode slow = head;
//使用快慢指针装list的前一半
while (fast != null && fast.next != null) {
fast = fast.next.next;
list.addLast(slow.val);
slow = slow.next;
}
//如果是单数,slow需要多走一步
if (fast != null) {
slow = slow.next;
}
while (slow != null) {
if (list.removeLast() != slow.val) {
return false;
}
slow = slow.next;
}
return true;
}
}

View File

@ -135,5 +135,34 @@ public class T52_139_WordBreak {
}
//回溯+记忆化搜索
public boolean wordBreak4(String s, List<String> wordDict) {
memo = new int[s.length()];
Arrays.fill(memo, -1);
return dfs1(s, 0, wordDict);
}
public boolean dfs1(String s, int start, List<String> wordDict) {
if (start == s.length()) {
return true;
}
if (memo[start] != -1) {
return memo[start] == 1;
}
for (String word : wordDict) {
int length = word.length();
if (s.length() - start >= length && word.equals(s.substring(start, start + word.length()))) {
if (dfs1(s, start + length, wordDict)) {
memo[start] = 1;
return true;
}
}
}
memo[start] = 0;
return false;
}
}

View File

@ -0,0 +1,67 @@
package com.markilue.leecode.hot100.second;
import com.markilue.leecode.tree.TreeNode;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-24 09:58
*@Description: TODO 力扣208 前缀树 二刷
*@Version: 1.0
*/
public class T65_208_Trie_1 {
//本身自己就是一个绩点
boolean isEnd;
T65_208_Trie_1[] children;
public T65_208_Trie_1() {
children = new T65_208_Trie_1[26];
isEnd = false;
}
public void insert(String word) {
T65_208_Trie_1 node = this;
for (char c : word.toCharArray()) {
if (node.children[c - 'a'] == null) {
node.children[c - 'a'] = new T65_208_Trie_1();
}
node = node.children[c - 'a'];
}
node.isEnd = true;//遍历完了是一个结尾
}
public boolean search(String word) {
T65_208_Trie_1 node = searchPrefix(word);
return node != null && node.isEnd;//找到了且是结尾
}
public boolean startsWith(String prefix) {
return searchPrefix(prefix)!=null;//不等于null即可
}
public T65_208_Trie_1 searchPrefix(String word) {
T65_208_Trie_1 node = this;
T65_208_Trie_1 child;
for (char c : word.toCharArray()) {
child = node.children[c - 'a'];
if (child == null) {
return null;
}
node = child;
}
return node;
}
public static void main(String[] args) {
T65_208_Trie_1 trie = new T65_208_Trie_1();
trie.insert("apple");
System.out.println(trie.search("apple")); // 返回 True
trie.search("app"); // 返回 False
trie.startsWith("app"); // 返回 True
trie.insert("app");
trie.search("app"); // 返回 True
}
}

View File

@ -0,0 +1,92 @@
package com.markilue.leecode.hot100.second;
import org.junit.Test;
import javax.annotation.Resource;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-24 10:27
*@Description: TODO 力扣215 数组中的第k个最大元素
*@Version: 1.0
*/
public class T66_215_FindKthLargest {
@Test
public void test() {
int[] nums = {3, 2, 1, 5, 6, 4};
int k = 2;
System.out.println(findKthLargest(nums, k));
}
@Test
public void test1() {
int[] nums = {1};
int k = 1;
System.out.println(findKthLargest(nums, k));
}
//思路1,快排的partition方法
public int findKthLargest(int[] nums, int k) {
partition(nums, 0, nums.length - 1, nums.length - k);
return nums[nums.length - k];
}
public boolean partition(int[] nums, int start, int end, int k) {
if (start > end) {
return false;
}
int index = sort(nums, start, end);
if (index == k) {
return true;
} else if (index < k) {
return partition(nums, index + 1, end, k);
} else {
return partition(nums, start, index - 1, k);
}
}
public int sort(int[] nums, int start, int end) {
int left = start;
int right = end + 1;
int num = nums[start];
while (left <= right) {
while (++left < nums.length && nums[left] < num) continue;
while (0 <= --right && nums[right] > num) continue;
if (left < right) swap(nums, left, right);
}
swap(nums, start, right);
return right;
}
public void swap(int[] nums, int left, int right) {
int temp = nums[left];
nums[left] = nums[right];
nums[right] = temp;
}
//记录每一个数出现的次数法
public int findKthLargest1(int[] nums, int k) {
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
int[] count = new int[20002];
for (int i = 0; i < nums.length; i++) {
int cur = nums[i] + 10000;
if (cur > max) max = cur;
if (cur < min) min = cur;
count[cur]++;
}
for (int i = max; i >= min; i--) {
k -= count[i];
if (k <= 0) return i-10000;
}
return -1;
}
}

View File

@ -0,0 +1,79 @@
package com.markilue.leecode.hot100.second;
import org.junit.Test;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-24 11:06
*@Description: TODO 力扣221 最大正方形
*@Version: 1.0
*/
public class T67_221_MaximalSquare {
@Test
public void test(){
char[][] matrix = {
{'1', '0', '1', '0', '0'},
{'1', '0', '1', '1', '1'},
{'1', '1', '1', '1', '1'},
{'1', '0', '0', '1', '0'}
};
System.out.println(maximalSquare(matrix));
}
@Test
public void test1(){
char[][] matrix = {
{'1', '1'},
{'1', '1'}
};
System.out.println(maximalSquare(matrix));
}
@Test
public void test2(){
char[][] matrix = {
{'1', '0', '1', '0'},
{'1', '0', '1', '1'},
{'1', '1', '1', '1'},
{'1', '1', '1', '1'}
};
System.out.println(maximalSquare(matrix));
}
//本质上就是判断横着的1和竖着的1哪一个边更长
public int maximalSquare(char[][] matrix) {
int m = matrix.length;
int n = matrix[0].length;
int[][] dp = new int[m][n];//dp[i][j]当前位置 两条边的最小值
dp[0][0] = matrix[0][0] == '1' ? 1 : 0;
int result = dp[0][0];
//初始化
for (int i = 1; i < m; i++) {
if (matrix[i][0] == '1') {
dp[i][0] = 1;
result = 1;
}
}
for (int i = 1; i < n; i++) {
if (matrix[0][i] == '1') {
dp[0][i] = 1;
result = 1;
}
}
for (int i = 1; i < m; i++) {
for (int j = 1; j < n; j++) {
if (matrix[i][j] == '1') {
dp[i][j] = Math.min(Math.min(dp[i - 1][j], dp[i][j - 1]),dp[i-1][j-1]) + 1;
result = Math.max(result, dp[i][j] * dp[i][j]);
}
}
}
return result;
}
}

View File

@ -0,0 +1,24 @@
package com.markilue.leecode.hot100.second;
import com.markilue.leecode.tree.TreeNode;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-24 11:36
*@Description: TODO 力扣226 翻转二叉树
*@Version: 1.0
*/
public class T68_226_InvertTree {
//深度优先,从下往上开始翻转
public TreeNode invertTree(TreeNode root) {
if (root == null) {
return null;
}
root.right = invertTree(root.left);
root.left = invertTree(root.right);
return root;
}
}

View File

@ -0,0 +1,36 @@
package com.markilue.leecode.hot100.second;
import com.markilue.leecode.listnode.ListNode;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-24 11:41
*@Description: TODO 力扣234 回文链表
*@Version: 1.0
*/
public class T69_234_IsPalindrome {
ListNode root;
public boolean isPalindrome(ListNode head) {
root = head;
return palindrome(head);
}
public boolean palindrome(ListNode node) {
if (node == null) {
return true;
}
if (palindrome(node.next)) {
if (root.val == node.val) {
root = root.next;
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>warehouse</artifactId>
<groupId>com.cqu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>algorithm</artifactId>
<dependencies>
<dependency>
<scope>test</scope>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<scope>test</scope>
<artifactId>jfreechart</artifactId>
<groupId>org.jfree</groupId>
<version>1.0.14</version>
</dependency>
<!--JTransforms是第一个用纯Java编写的、开源的、多线程的FFT库。
目前,有四种类型的变换可用:
离散傅里叶变换DFT、离散余弦变换DCT、离散正弦变换DST和离散哈特利变换DHT-->
<!--使用教程:http://wendykierp.github.io/JTransforms/apidocs/-->
<dependency>
<groupId>com.github.wendykierp</groupId>
<artifactId>JTransforms</artifactId>
<version>3.1</version>
<classifier>with-dependencies</classifier>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

View File

@ -0,0 +1,39 @@
package com.cqu.algorithm.downsampling;
import com.cqu.algorithm.downsampling.impl.lt.LTABuilder;
import com.cqu.algorithm.downsampling.impl.mm.MMAlgorithm;
import com.cqu.algorithm.downsampling.impl.mm.PIPlotAlgorithm;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling
*@Author: markilue
*@CreateTime: 2023-05-23 13:45
*@Description: TODO 可用的降采样算法
*@Version: 1.0
*/
public enum DSAlgorithms implements DownSamplingAlgorithm{
/** OSIsoft PI PlotValues */
PIPLOT(new PIPlotAlgorithm()),
/** Largest Triangle Three Bucket */
LTTB(new LTABuilder().threeBucket().fixed().build()),
/** Largest Triangle One Bucket */
LTOB(new LTABuilder().oneBucket().fixed().build()),
/** Largest Triangle Dynamic */
LTD(new LTABuilder().threeBucket().dynamic().build()),
/** Maximum and minimum value */
MAXMIN(new MMAlgorithm());
private DownSamplingAlgorithm delegate;
DSAlgorithms(DownSamplingAlgorithm delegate){
this.delegate=delegate;
}
@Override
public List<Event> process(List<Event> data, int threshold) {
return delegate.process(data,threshold);
}
}

View File

@ -0,0 +1,23 @@
package com.cqu.algorithm.downsampling;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling
*@Author: markilue
*@CreateTime: 2023-05-23 13:41
*@Description: TODO 降采样算法接口:降采样需要实现的方法
* 1)process
*@Version: 1.0
*/
public interface DownSamplingAlgorithm {
/**
*
* @param data The original data
* @param threshold Number of data points to be returned
* @return the downsampled data
*/
List<Event> process(List<Event> data, int threshold);
}

View File

@ -0,0 +1,21 @@
package com.cqu.algorithm.downsampling;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling
*@Author: markilue
*@CreateTime: 2023-05-23 13:42
*@Description:
* TODO 降采样的一个点的示例:
* 这个点包含两个东西:
* 1当前时间戳
* 2当前值大小
*@Version: 1.0
*/
public interface Event {
long getTime();
double getValue();
}

View File

@ -0,0 +1,22 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 13:56
*@Description:
* TODO 分割的桶接口:
* 一个桶中包含多个事件(),然后从中选择重要事件(抽样)
*@Version: 1.0
*/
public interface Bucket {
public void selectInto(List<Event> result);
public void add(Event e);
}

View File

@ -0,0 +1,63 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.DownSamplingAlgorithm;
import com.cqu.algorithm.downsampling.Event;
import java.util.ArrayList;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 14:08
*@Description:
* TODO 使用bucket来进行降采样的基本框架(模板方法设计模式):
*@Version: 1.0
*/
public abstract class BucketBasedAlgorithm <B extends Bucket, E extends Event> implements DownSamplingAlgorithm {
protected BucketSplitter<B, E> spliter;
protected BucketFactory<B> factory;
/**
* initialize data for down sampling
*/
protected abstract List<E> prepare(List<Event> data);
/**
* calculating weight or something else
*/
protected abstract void beforeSelect(List<B> buckets, int threshold);
@Override
public List<Event> process(List<Event> events, int threshold) {
int dataSize = events.size();
if (threshold >= dataSize || dataSize < 3) {
return events;
}
List<E> preparedData = prepare(events);
List<B> buckets = spliter.split(factory, preparedData, threshold);
// calculating weight or something else
beforeSelect(buckets, threshold);
List<Event> result = new ArrayList<Event>(threshold);
// select from every bucket
for (Bucket bucket : buckets) {
bucket.selectInto(result);
}
return result;
}
public void setSpliter(BucketSplitter<B, E> spliter) {
this.spliter = spliter;
}
public void setBucketFactory(BucketFactory<B> factory) {
this.factory = factory;
}
}

View File

@ -0,0 +1,20 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 14:02
*@Description: TODO 工厂模式实现类
*@Version: 1.0
*/
public interface BucketFactory<B extends Bucket> {
B newBucket();
B newBucket(int size);
B newBucket(Event e);
}

View File

@ -0,0 +1,19 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 13:55
*@Description:
* TODO 将一个一个event分割为buckets的接口
*@Version: 1.0
*/
public interface BucketSplitter<B extends Bucket, E extends Event> {
List<B> split(BucketFactory<B> factory, List<E> data, int threshold);
}

View File

@ -0,0 +1,95 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
import java.util.Comparator;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 14:31
*@Description: TODO
*@Version: 1.0
*/
public enum EventOrder implements Comparator<Event> {
BY_TIME_ASC {
@Override
public int compare(Event e1, Event e2) {
if (e1 == null && e2 == null) {
return 0;
} else if (e1 == null) {
return -1;
} else if (e2 == null) {
return 1;
}
return e1.getTime() < e2.getTime() ? -1 : 1;
}
},
BY_VAL_ASC {
@Override
public int compare(Event e1, Event e2) {
if (e1 == null && e2 == null) {
return 0;
} else if (e1 == null) {
return -1;
} else if (e2 == null) {
return 1;
}
return e1.getValue() < e2.getValue() ? -1 : 1;
}
},
BY_VAL_DESC {
@Override
public int compare(Event e1, Event e2) {
if (e1 == null && e2 == null) {
return 0;
} else if (e1 == null) {
return -1;
} else if (e2 == null) {
return 1;
}
return e1.getValue() < e2.getValue() ? 1 : -1;
}
},
BY_ABS_VAL_ASC {
@Override
public int compare(Event e1, Event e2) {
if (e1 == null && e2 == null) {
return 0;
} else if (e1 == null) {
return -1;
} else if (e2 == null) {
return 1;
}
return Math.abs(e1.getValue()) < Math.abs(e2.getValue()) ? -1 : 1;
}
},
BY_ABS_VAL_DESC {
@Override
public int compare(Event e1, Event e2) {
if (e1 == null && e2 == null) {
return 0;
} else if (e1 == null) {
return -1;
} else if (e2 == null) {
return 1;
}
return Math.abs(e1.getValue()) < Math.abs(e2.getValue()) ? 1 : -1;
}
}
}

View File

@ -0,0 +1,51 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
import java.util.ArrayList;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 13:52
*@Description:
* TODO 固定一个桶的分割器:
* 1)将第一个事件分配给第一个bucket将最后一个事件分配到最后一个bucket<br>
* 2)将其余事件拆分为其余阈值-2存储桶每个存储桶包含大约相等数量的事件<br>
*@Version: 1.0
*/
public class FixedNumBucketSplitter<B extends Bucket, E extends Event> implements BucketSplitter<B, E> {
@Override
public List<B> split(BucketFactory<B> factory, List<E> data, int threshold) {
int bucketNum = threshold - 2;
int netSize = data.size() - 2;
int bucketSize = (netSize + bucketNum - 1) / bucketNum;
List<B> buckets = new ArrayList<>(threshold);
for (int i = 0; i < threshold; i++) {
buckets.add(null);
}
buckets.set(0, factory.newBucket(data.get(0)));
buckets.set(threshold - 1, factory.newBucket(data.get(data.size() - 1)));
for (int i = 0; i < bucketNum; i++) {
buckets.set(i + 1, factory.newBucket(bucketSize));
}
double step = netSize * 1.0 / bucketNum;
double curr = step;
int bucketIndex = 1;
for (int i = 1; i <= netSize; i++) {
buckets.get(bucketIndex).add(data.get(i));
if (i > curr) {
bucketIndex++;
curr += step;
}
}
return buckets;
}
}

View File

@ -0,0 +1,49 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
import java.util.ArrayList;
import java.util.List;
/**
* Split data into buckets with equal time span
*/
public class FixedTimeBucketSplitter<B extends Bucket, E extends Event> implements BucketSplitter<B, E> {
public List<B> split2(BucketFactory<B> factory, List<E> data, int threshold) {
List<B> buckets = new ArrayList<>(threshold);
long start = data.get(0).getTime();
long end = data.get(data.size() - 1).getTime();
long span = end - start;
double pice = span / threshold;
double time = start;
int index = -1;
for (int i = 0; i < data.size(); i++) {
Event e = data.get(i);
if (e.getTime() >= time) {
time += pice;
index++;
buckets.add(factory.newBucket());
}
buckets.get(index).add(e);
}
return buckets;
}
public List<B> split(BucketFactory<B> factory, List<E> data, int threshold) {
List<B> buckets = new ArrayList<>(threshold);
for (int i = 0; i < threshold; i++) {
buckets.add(factory.newBucket());
}
long start = data.get(0).getTime();
long end = data.get(data.size() - 1).getTime();
long span = end - start;
for (Event e : data) {
int bindex = (int) ((e.getTime() - start) * threshold / span);
bindex = bindex >= threshold ? threshold - 1 : bindex;
buckets.get(bindex).add(e);
}
return buckets;
}
}

View File

@ -0,0 +1,54 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 14:04
*@Description: TODO 重写了equals和hashcode的Event基类
*@Version: 1.0
*/
public class PlainEvent implements Event {
private long time;
private double value;
public PlainEvent(long time, double value) {
this.time = time;
this.value = value;
}
@Override
public long getTime() {
return time;
}
@Override
public double getValue() {
return value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (time ^ (time >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PlainEvent other = (PlainEvent) obj;
if (time != other.time)
return false;
return true;
}
}

View File

@ -0,0 +1,85 @@
package com.cqu.algorithm.downsampling.impl;
import com.cqu.algorithm.downsampling.Event;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl
*@Author: markilue
*@CreateTime: 2023-05-23 13:53
*@Description: TODO 带权重的事件()
*@Version: 1.0
*/
public class WeightedEvent implements Event {
private Event event;
private double weight;
public WeightedEvent(long time, double value) {
this.event = new PlainEvent(time, value);
}
public WeightedEvent(Event e) {
this.event = e;
}
public Event getEvent() {
return event;
}
public long getTime() {
return event.getTime();
}
public double getValue() {
return event.getValue();
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
@Override
public String toString() {
if (event == null) {
return "[null event]";
}
return "[t=" + event.getTime() + ", v=" + event.getValue() + "]";
}
@Override
public int hashCode() {
if (event == null) {
return super.hashCode();
}
final int prime = 31;
int result = 1;
result = prime * result + (int) (event.getTime() ^ (event.getTime() >>> 32));
long temp;
temp = Double.doubleToLongBits(event.getValue());
result = prime * result + (int) (temp ^ (temp >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
WeightedEvent other = (WeightedEvent) obj;
if (other.event == null || event == null) {
return false;
}
if (event.getTime() != other.event.getTime())
return false;
if (Double.doubleToLongBits(event.getValue()) != Double.doubleToLongBits(other.event.getValue()))
return false;
return true;
}
}

View File

@ -0,0 +1,50 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.impl.FixedNumBucketSplitter;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 13:48
*@Description: TODO 一个创建LT算法的构建类:构造者设计模式
*@Version: 1.0
*/
public class LTABuilder {
public static final FixedNumBucketSplitter<LTWeightedBucket, WeightedEvent> S_FIXED = new FixedNumBucketSplitter<>();
public static final LTDynamicBucketSplitter S_DYNAMIC = new LTDynamicBucketSplitter();
public static final LTOneBucketWeightCalculator ONE_BUCKET = new LTOneBucketWeightCalculator();
public static final LTThreeBucketWeightCalculator THREE_BUCKET = new LTThreeBucketWeightCalculator();
private LTAlgorithm lta;
public LTABuilder() {
lta = new LTAlgorithm();
lta.setBucketFactory(new LTWeightedBucketFactory());
}
public LTABuilder fixed() {
lta.setSpliter(S_FIXED);
return this;
}
public LTABuilder dynamic() {
lta.setSpliter(S_DYNAMIC);
return this;
}
public LTABuilder oneBucket() {
lta.setWcalc(ONE_BUCKET);
return this;
}
public LTABuilder threeBucket() {
lta.setWcalc(THREE_BUCKET);
return this;
}
public LTAlgorithm build() {
return lta;
}
}

View File

@ -0,0 +1,66 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.BucketBasedAlgorithm;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
import java.util.ArrayList;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 14:07
*@Description:
* TODO 相关抽样算法:
* * Largest Triangle Bucket Algorithm family.
* * <ul>
* * <li>LTOB: Largest Triangle One Bucket</li>
* * <li>LTTB: Largest Triangle Three Bucket</li>
* * <li>LTD: Largest Triangle Dynamic (three bucket)</li>
* * </ul>
*@Version: 1.0
*/
public class LTAlgorithm extends BucketBasedAlgorithm<LTWeightedBucket, WeightedEvent> {
protected Triangle triangle = new Triangle();
protected LTWeightCalculator wcalc;
LTAlgorithm() {
}
@Override
protected List<WeightedEvent> prepare(List<Event> data) {
List<WeightedEvent> result = new ArrayList<>(data.size());
for (Event event : data) {
result.add(new WeightedEvent(event));
}
return result;
}
@Override
protected void beforeSelect(List<LTWeightedBucket> buckets, int threshold) {
wcalc.calcWeight(triangle, buckets);
}
public void setWcalc(LTWeightCalculator wcalc) {
this.wcalc = wcalc;
}
@Override
public String toString() {
String name = "LT";
if (this.wcalc instanceof LTOneBucketWeightCalculator) {
name += "O";
} else if (this.wcalc instanceof LTThreeBucketWeightCalculator) {
name += "T";
}
if (this.spliter instanceof LTDynamicBucketSplitter) {
name += "D";
} else {
name += "B";
}
return name;
}
}

View File

@ -0,0 +1,107 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.impl.BucketFactory;
import com.cqu.algorithm.downsampling.impl.BucketSplitter;
import com.cqu.algorithm.downsampling.impl.FixedNumBucketSplitter;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 13:50
*@Description: TODO LTDB算法
* * <p>
* * A bucket-splitter dynamically resize bucket according to their SSE(Sum of Square Errors).
* * </p>
* * <p>
* * In each iteration, the bucket with the highest SSE is split into two new buckets, two buckets with the lowest SSE
* * are merged into a new one.
* * </p>
* * <p>
* * LTD recommended number of iterations is DataSize / threshold * 10 but it depends. For a plot whit one highly
* * fluctuating area and several small peaks, big number of iterations causes small peaks to be lost. So I change the
* * formula to DataSize / threshold / 10 and limit the number to 500.
*@Version: 1.0
*/
public class LTDynamicBucketSplitter implements BucketSplitter<LTWeightedBucket, WeightedEvent> {
private FixedNumBucketSplitter<LTWeightedBucket, WeightedEvent> fs = new FixedNumBucketSplitter<>();
private double iterationRate = 0.1;
private int maxIteration = 500;
public double getIterationRate() {
return iterationRate;
}
public void setIterationRate(double iterationRate) {
this.iterationRate = iterationRate;
}
public void setMaxIteration(int maxIt) {
this.maxIteration = maxIt;
}
@Override
public List<LTWeightedBucket> split(BucketFactory<LTWeightedBucket> factory, List<WeightedEvent> data, int threshold) {
// first split equally
List<LTWeightedBucket> buckets = fs.split(factory, data, threshold);
// resize buckets
LinkedBucketNode head = LinkedBucketNode.fromList(buckets);
for (int i = getItCount(data.size(), threshold); i >= 0; i--) {
LinkedBucketNode max = findMaxSSE(head);
findMinSSEPair(head, max).merge();
max.split();
}
return LinkedBucketNode.toList(head);
}
private int getItCount(int total, int threshold) {
int itCount = (int) (total / threshold * iterationRate);
if (itCount > maxIteration) {
itCount = maxIteration;
} else if (itCount < 1) {
itCount = 1;
}
return itCount;
}
private final static LinkedBucketNode findMinSSEPair(LinkedBucketNode head, LinkedBucketNode exclude) {
double minSSE = Double.MAX_VALUE;
LinkedBucketNode low = null;
LinkedBucketNode end = head.getNext().getNext().getNext();
while ((end = end.getNext()) != null) {
LinkedBucketNode beta = end.getLast();
LinkedBucketNode alpha = beta.getLast();
if (beta == exclude) {
continue;
}
double sum = alpha.getValue().sse() + beta.getValue().sse();
if (sum < minSSE) {
minSSE = sum;
low = alpha;
}
}
return low;
}
private final static LinkedBucketNode findMaxSSE(LinkedBucketNode head) {
double maxSSE = Double.MIN_VALUE;
LinkedBucketNode max = null;
LinkedBucketNode end = head.getEnd();
LinkedBucketNode n2 = head.getNext().getNext();
while (n2 != end) {
LinkedBucketNode n1 = n2.getLast();
LinkedBucketNode n3 = n2.getNext();
LTWeightedBucket b = n2.getValue();
if (b.calcSSE(n1.getValue(), n3.getValue()) > maxSSE && b.size() > 1) {
maxSSE = b.sse();
max = n2;
}
n2 = n2.getNext();
}
return max;
}
}

View File

@ -0,0 +1,26 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 13:50
*@Description:
* TODO LTOB算法:
* Weight = Area of triangle (point A: the previous event, point B: this event; point C: the next event)
*@Version: 1.0
*/
public class LTOneBucketWeightCalculator implements LTWeightCalculator {
@Override
public void calcWeight(Triangle triangle, List<LTWeightedBucket> buckets) {
for (LTWeightedBucket bucket : buckets) {
for (WeightedEvent event : bucket) {
triangle.calc(event);
}
}
}
}

View File

@ -0,0 +1,31 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 13:50
*@Description:
* TODO LTTB算法:
* Weight = Area of triangle (point A: the previous selected event, point B: this event; point C: average event int the next bucket)
*@Version: 1.0
*/
public class LTThreeBucketWeightCalculator implements LTWeightCalculator {
@Override
public void calcWeight(Triangle triangle, List<LTWeightedBucket> buckets) {
for (int i = 1; i < buckets.size() - 1; i++) {
LTWeightedBucket bucket = buckets.get(i);
WeightedEvent last = buckets.get(i - 1).select()[0];
WeightedEvent next = buckets.get(i + 1).average();
for (int j = 0; j < bucket.size(); j++) {
WeightedEvent curr = bucket.get(j);
triangle.calc(last, curr, next);
}
}
}
}

View File

@ -0,0 +1,16 @@
package com.cqu.algorithm.downsampling.impl.lt;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 14:13
*@Description: TODO LT权重计算器接口
*@Version: 1.0
*/
public interface LTWeightCalculator {
void calcWeight(Triangle triangle, List<LTWeightedBucket> buckets);
}

View File

@ -0,0 +1,201 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.Bucket;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 13:52
*@Description: TODO 具体的LTTB算法的分割(抽样)实现
*@Version: 1.0
*/
public class LTWeightedBucket implements Iterable<WeightedEvent>, Bucket {
private int index = 0;
private WeightedEvent[] events;
private WeightedEvent selected;
// a virtual event represents the average value in next bucket
private WeightedEvent average;
// -1 means SSE not calculated yet
private double sse = -1;
public LTWeightedBucket() {
}
public LTWeightedBucket(WeightedEvent event) {
index = 1;
events = new WeightedEvent[] { event };
}
public LTWeightedBucket(int size) {
if (size <= 0) {
throw new IllegalArgumentException("Bucket size must be positive");
}
events = new WeightedEvent[size];
}
public LTWeightedBucket copy() {
LTWeightedBucket b = new LTWeightedBucket(events.length);
b.index = index;
for (int i = 0; i < index; i++) {
b.events[i] = new WeightedEvent(events[i].getEvent());
}
return b;
}
public void initSize(int size) {
if (events == null) {
events = new WeightedEvent[size];
}
}
public void selectInto(List<Event> result) {
for (WeightedEvent e : select()) {
result.add(e.getEvent());
}
}
public void add(Event e) {
if (index < events.length) {
events[index++] = (WeightedEvent) e;
}
}
public WeightedEvent get(int i) {
return i < index ? events[i] : null;
}
public int size() {
return index;
}
public WeightedEvent average() {
if (null == average) {
if (index == 1) {
average = events[0];
} else {
double valueSum = 0;
long timeSum = 0;
for (int i = 0; i < index; i++) {
Event e = events[i];
valueSum += e.getValue();
timeSum += e.getTime();
}
average = new WeightedEvent(timeSum / index, valueSum / index);
}
}
return average;
}
public WeightedEvent[] select() {
if (index == 0) {
return new WeightedEvent[0];
}
if (null == selected) {
if (index == 1) {
selected = events[0];
} else {
double max = Double.MIN_VALUE;
int maxIndex = 0;
for (int i = 0; i < index; i++) {
double w = events[i].getWeight();
if (w > max) {
maxIndex = i;
max = w;
}
}
selected = events[maxIndex];
}
}
return new WeightedEvent[] { selected };
}
public double sse() {
return sse;
}
/**
* Calculate sum of squared errors, with one event in adjacent buckets overlapping
**/
public double calcSSE(LTWeightedBucket last, LTWeightedBucket next) {
if (sse == -1) {
double lastVal = last.get(last.size() - 1).getValue();
double nextVal = next.get(0).getValue();
double avg = lastVal + nextVal;
for (int i = 0; i < index; i++) {
Event e = events[i];
avg += e.getValue();
}
avg = avg / (index + 2);
double lastSe = sequarErrors(lastVal, avg);
double nextSe = sequarErrors(nextVal, avg);
sse = lastSe + nextSe;
for (int i = 0; i < index; i++) {
Event e = events[i];
sse += sequarErrors(e.getValue(), avg);
}
}
return sse;
}
@Override
public Iterator<WeightedEvent> iterator() {
return new Iterator<WeightedEvent>() {
int cursor = 0;
public void remove() {
throw new UnsupportedOperationException();
}
public WeightedEvent next() {
return events[cursor++];
}
public boolean hasNext() {
return cursor < index;
}
};
}
@Override
public String toString() {
return Arrays.toString(events);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(events);
result = prime * result + index;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
LTWeightedBucket other = (LTWeightedBucket) obj;
if (!Arrays.equals(events, other.events))
return false;
if (index != other.index)
return false;
return true;
}
private double sequarErrors(double d, double avg) {
double e = d - avg;
return e * e;
}
}

View File

@ -0,0 +1,30 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.BucketFactory;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 14:06
*@Description: TODO LT的bucket工厂
*@Version: 1.0
*/
public class LTWeightedBucketFactory implements BucketFactory<LTWeightedBucket> {
@Override
public LTWeightedBucket newBucket() {
return new LTWeightedBucket();
}
@Override
public LTWeightedBucket newBucket(int size) {
return new LTWeightedBucket(size);
}
@Override
public LTWeightedBucket newBucket(Event e) {
return new LTWeightedBucket((WeightedEvent) e);
}
}

View File

@ -0,0 +1,182 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
import java.util.ArrayList;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 14:18
*@Description: TODO Tow-way linked list to perform bucket split and merge.
*@Version: 1.0
*/
public class LinkedBucketNode {
private LinkedBucketNode last;
private LinkedBucketNode next;
private LinkedBucketNode end;
private LTWeightedBucket value;
private int size;
/**
* split this node into 2 new nodes,each contains a new bucket with half events.
*
* @return if bucket contains more than 2 events, return the last node, else return this.
*/
public LinkedBucketNode split() {
int size = value.size();
if (size < 2) {
return this;
}
LTWeightedBucket b0 = new LTWeightedBucket(size / 2);
LTWeightedBucket b1 = new LTWeightedBucket(size - size / 2);
for (int i = 0; i < size; i++) {
(i < size / 2 ? b0 : b1).add(value.get(i));
}
LinkedBucketNode n0 = new LinkedBucketNode(b0);
LinkedBucketNode n1 = new LinkedBucketNode(b1);
replace(this, n0);
insert(n0, n1);
return n1;
}
/**
* merge this node and the next node into one.
*
* @return the merged node;
*/
public LinkedBucketNode merge() {
if (next == null) {
return this;
}
LTWeightedBucket m = new LTWeightedBucket(value.size() + next.value.size());
for (WeightedEvent e : value) {
m.add(e);
}
for (WeightedEvent e : next.value) {
m.add(e);
}
LinkedBucketNode n = new LinkedBucketNode(m);
LinkedBucketNode tail = next.next;
concat(last, n, tail);
return n;
}
public static LinkedBucketNode fromList(List<LTWeightedBucket> arr) {
LinkedBucketNode head = new LinkedBucketNode(arr.size());
LinkedBucketNode last = head;
for (int i = 0; i < arr.size(); i++) {
LinkedBucketNode node = new LinkedBucketNode(arr.get(i));
head.end = node;
node.last = last;
last.next = node;
last = node;
}
return head;
}
public static List<LTWeightedBucket> toList(LinkedBucketNode head) {
List<LTWeightedBucket> arr = new ArrayList<LTWeightedBucket>(head.size);
LinkedBucketNode node = head.next;
while (node != null) {
arr.add(node.value);
node = node.next;
}
return arr;
}
public static LinkedBucketNode fromArray(LTWeightedBucket[] arr) {
LinkedBucketNode head = new LinkedBucketNode(arr.length);
LinkedBucketNode last = head;
for (int i = 0; i < arr.length; i++) {
LinkedBucketNode node = new LinkedBucketNode(arr[i]);
head.end = node;
node.last = last;
last.next = node;
last = node;
}
return head;
}
public static LTWeightedBucket[] toArray(LinkedBucketNode head) {
LTWeightedBucket[] arr = new LTWeightedBucket[head.size];
LinkedBucketNode node = head.next;
int index = 0;
while (node != null) {
arr[index++] = node.value;
node = node.next;
}
return arr;
}
public static void insert(LinkedBucketNode node, LinkedBucketNode append) {
LinkedBucketNode next = node.next;
node.next = append;
append.last = node;
append.next = next;
if (next != null) {
next.last = append;
}
}
public static void replace(LinkedBucketNode node, LinkedBucketNode rep) {
LinkedBucketNode next = node.next;
LinkedBucketNode last = node.last;
node.last = null;
node.next = null;
last.next = rep;
rep.last = last;
rep.next = next;
if (next != null) {
next.last = rep;
}
}
public static void concat(LinkedBucketNode head, LinkedBucketNode node, LinkedBucketNode tail) {
head.next = node;
node.last = head;
node.next = tail;
if (tail != null) {
tail.last = node;
}
}
public LinkedBucketNode(int size) {
this.size = size;
}
public LinkedBucketNode(LTWeightedBucket b) {
value = b;
}
public LinkedBucketNode getEnd() {
return end;
}
public LinkedBucketNode getLast() {
return last;
}
public void setLast(LinkedBucketNode last) {
this.last = last;
}
public LinkedBucketNode getNext() {
return next;
}
public void setNext(LinkedBucketNode next) {
this.next = next;
}
public LTWeightedBucket getValue() {
return value;
}
public void setValue(LTWeightedBucket value) {
this.value = value;
}
}

View File

@ -0,0 +1,50 @@
package com.cqu.algorithm.downsampling.impl.lt;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.lt
*@Author: markilue
*@CreateTime: 2023-05-23 14:11
*@Description:
* TODO 分割区域:
* 将一堆事件划分为一个一个矩形区域
*@Version: 1.0
*/
public class Triangle {
private WeightedEvent last;
private WeightedEvent curr;
private WeightedEvent next;
// S=(1/2)*|x1*(y2-y3) + x2*(y3-y1) + x3*(y1-y2)|
// S=(1/2)*|y1*(x2-x3) + y2*(x3-x1) + y3*(x1-x2)|
private void updateWeight() {
if (last == null || curr == null || next == null) {
return;
}
double dx1 = curr.getTime() - last.getTime();
double dx2 = last.getTime() - next.getTime();
double dx3 = next.getTime() - curr.getTime();
double y1 = next.getValue();
double y2 = curr.getValue();
double y3 = last.getValue();
double s = 0.5 * Math.abs(y1 * dx1 + y2 * dx2 + y3 * dx3);
curr.setWeight(s);
}
public void calc(WeightedEvent e) {
last = curr;
curr = next;
next = e;
updateWeight();
}
public void calc(WeightedEvent last, WeightedEvent curr, WeightedEvent next) {
this.last = last;
this.curr = curr;
this.next = next;
updateWeight();
}
}

View File

@ -0,0 +1,50 @@
package com.cqu.algorithm.downsampling.impl.mix;
import com.cqu.algorithm.downsampling.DownSamplingAlgorithm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.EventOrder;
import java.util.*;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.mix
*@Author: markilue
*@CreateTime: 2023-05-23 14:27
*@Description:
* TODO
* Merge other algorithms' results into a new result, then deduplicate and sort it.
*@Version: 1.0
*/
public class MixedAlgorithm implements DownSamplingAlgorithm {
private LinkedHashMap<DownSamplingAlgorithm, Double> map = new LinkedHashMap<DownSamplingAlgorithm, Double>();
public void add(DownSamplingAlgorithm da, double rate) {
map.put(da, rate);
}
@Override
public List<Event> process(List<Event> data, int threshold) {
if (map.isEmpty()) {
return data;
}
LinkedHashSet<Event> set = new LinkedHashSet<>();
for (DownSamplingAlgorithm da : map.keySet()) {
List<Event> subList = da.process(data, (int) (threshold * map.get(da)));
set.addAll(subList);
}
List<Event> result = new ArrayList<>(set.size());
result.addAll(set);
Collections.sort(result, EventOrder.BY_TIME_ASC);
return result;
}
@Override
public String toString() {
String name = "MIXED";
if (!map.isEmpty()) {
name += map.toString();
}
return name;
}
}

View File

@ -0,0 +1,36 @@
package com.cqu.algorithm.downsampling.impl.mm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.BucketBasedAlgorithm;
import com.cqu.algorithm.downsampling.impl.FixedTimeBucketSplitter;
import java.util.List;
/**
* Select events with maximum or minimum value in bucket
*/
public class MMAlgorithm extends BucketBasedAlgorithm<MMBucket, Event> {
public MMAlgorithm() {
setBucketFactory(new MMBucketFactory());
setSpliter(new FixedTimeBucketSplitter<MMBucket, Event>());
}
@Override
protected List<Event> prepare(List<Event> data) {
return data;
}
@Override
protected void beforeSelect(List<MMBucket> buckets, int threshold) {
}
@Override
public String toString() {
return "MaxMin";
}
}

View File

@ -0,0 +1,71 @@
package com.cqu.algorithm.downsampling.impl.mm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.Bucket;
import java.util.ArrayList;
import java.util.List;
/**
* Bucket that selects events with maximum or minimum value
*/
public class MMBucket implements Bucket {
protected List<Event> events = new ArrayList<>();
public MMBucket() {
}
public MMBucket(Event e) {
events.add(e);
}
public MMBucket(int size) {
}
@Override
public void selectInto(List<Event> result) {
if (events.size() <= 1) {
result.addAll(events);
return;
}
Event maxEvt = null;
Event minEvt = null;
double max = Double.MIN_VALUE;
double min = Double.MAX_VALUE;
for (Event e : events) {
double val = e.getValue();
if (val > max) {
maxEvt = e;
max = e.getValue();
}
if (val < min) {
minEvt = e;
min = e.getValue();
}
}
if (maxEvt != null && minEvt != null) {
boolean maxFirst = maxEvt.getTime() < minEvt.getTime();
if (maxFirst) {
result.add(maxEvt);
result.add(minEvt);
} else {
result.add(minEvt);
result.add(maxEvt);
}
} else if (maxEvt == null && minEvt != null) {
result.add(minEvt);
} else if (maxEvt != null && minEvt == null) {
result.add(maxEvt);
}
}
@Override
public void add(Event e) {
events.add(e);
}
}

View File

@ -0,0 +1,24 @@
package com.cqu.algorithm.downsampling.impl.mm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.BucketFactory;
public class MMBucketFactory implements BucketFactory<MMBucket> {
@Override
public MMBucket newBucket() {
return new MMBucket();
}
@Override
public MMBucket newBucket(int size) {
return new MMBucket(size);
}
@Override
public MMBucket newBucket(Event e) {
return new MMBucket(e);
}
}

View File

@ -0,0 +1,35 @@
package com.cqu.algorithm.downsampling.impl.mm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.BucketBasedAlgorithm;
import com.cqu.algorithm.downsampling.impl.FixedTimeBucketSplitter;
import java.util.List;
/**
* OSISoft PI PlotValues algorithm. (without interpolation on boundary)
*/
public class PIPlotAlgorithm extends BucketBasedAlgorithm<PIPlotBucket, Event> {
public PIPlotAlgorithm() {
setBucketFactory(new PIPlotBucketFactory());
setSpliter(new FixedTimeBucketSplitter<PIPlotBucket, Event>());
}
@Override
protected List<Event> prepare(List<Event> data) {
return data;
}
@Override
protected void beforeSelect(List<PIPlotBucket> buckets, int threshold) {
}
@Override
public String toString() {
return "PIPlot";
}
}

View File

@ -0,0 +1,40 @@
package com.cqu.algorithm.downsampling.impl.mm;
import com.cqu.algorithm.downsampling.Event;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* Bucket that selects the first, the last event and events with maximum or minimum value
*/
public class PIPlotBucket extends MMBucket {
public PIPlotBucket() {
}
public PIPlotBucket(int size) {
super(size);
}
public PIPlotBucket(Event e) {
super(e);
}
@Override
public void selectInto(List<Event> result) {
List<Event> temp = new ArrayList<>();
super.selectInto(temp);
Set<Event> set = new LinkedHashSet<>();
if (!temp.isEmpty()) {
set.add(events.get(0));
set.addAll(temp);
set.add(events.get(events.size() - 1));
}
result.addAll(set);
}
}

View File

@ -0,0 +1,24 @@
package com.cqu.algorithm.downsampling.impl.mm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.BucketFactory;
public class PIPlotBucketFactory implements BucketFactory<PIPlotBucket> {
@Override
public PIPlotBucket newBucket() {
return new PIPlotBucket();
}
@Override
public PIPlotBucket newBucket(int size) {
return new PIPlotBucket(size);
}
@Override
public PIPlotBucket newBucket(Event e) {
return new PIPlotBucket(e);
}
}

View File

@ -0,0 +1,71 @@
package com.cqu.algorithm.downsampling.impl.tg;
import com.cqu.algorithm.downsampling.DownSamplingAlgorithm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.EventOrder;
import com.cqu.algorithm.downsampling.impl.WeightedEvent;
import java.util.*;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.downsampling.impl.tg
*@Author: markilue
*@CreateTime: 2023-05-23 14:33
*@Description:
* TODO
* Find out big gaps between events and select events at both ends of those gaps.
*@Version: 1.0
*/
public class TimeGapAlgorithm implements DownSamplingAlgorithm {
private double rate = 1;
@Override
public List<Event> process(List<Event> data, int threshold) {
if (data.isEmpty() || threshold >= data.size()) {
return data;
}
List<Event> result = new ArrayList<>();
List<WeightedEvent> weighted = new ArrayList<>();
double avg = (data.get(data.size() - 1).getTime() - data.get(0).getTime()) * 1.0 / (data.size() - 1);
for (int i = 0; i < data.size(); i++) {
WeightedEvent we = new WeightedEvent(data.get(i));
if (i < data.size() - 1) {
long delta = data.get(i + 1).getTime() - data.get(i).getTime();
we.setWeight(delta - avg);
}
weighted.add(we);
}
Set<Event> set = new HashSet<>();
int max = (int) (threshold * rate);
int multiple = 1024;
int limit = Integer.MAX_VALUE;
A: while (multiple > 2) {
for (int i = 0; i < weighted.size(); i++) {
WeightedEvent e = weighted.get(i);
double m = e.getWeight() / avg;
if (m > multiple && m <= limit) {
set.add(e.getEvent());
if (i + 1 < weighted.size()) {
set.add(weighted.get(i + 1).getEvent());
}
}
if (set.size() >= max) {
break A;
}
}
limit = multiple;
multiple >>= 2;
}
result.addAll(set);
Collections.sort(result, EventOrder.BY_TIME_ASC);
return result;
}
@Override
public String toString() {
return "TIMEGAP";
}
}

View File

@ -0,0 +1,25 @@
package com.cqu.algorithm.fft;
import org.jtransforms.fft.BenchmarkDoubleFFT;
import org.jtransforms.fft.BenchmarkFloatFFT;
import org.jtransforms.fft.DoubleFFT_1D;
import java.io.IOException;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.algorithm.fft
*@Author: markilue
*@CreateTime: 2023-05-23 20:19
*@Description: TODO FFT类
*@Version: 1.0
*/
public class FFT {
public static void main(String[] args) throws IOException {
new DoubleFFT_1D(10).complexForward(new double[10]);
}
}

View File

@ -0,0 +1,61 @@
package com.cqu.algorithm.downsampling.test;
import com.cqu.algorithm.downsampling.DSAlgorithms;
import com.cqu.algorithm.downsampling.DownSamplingAlgorithm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.mix.MixedAlgorithm;
import com.cqu.algorithm.downsampling.impl.tg.TimeGapAlgorithm;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class DownSampleTest {
public static void main(String[] args) {
// 斜坡现象name=D3.csv, max = 1000000, threshold=560
int max = 60000;
int threshold = 500;
String name = "D6.csv";
List<Event> rawData = TestData.read(name, max);
System.err.println("Total Points Read In " + name + " = " + rawData.size());
MixedAlgorithm mixed = new MixedAlgorithm();
mixed.add(DSAlgorithms.LTTB, 1);
mixed.add(new TimeGapAlgorithm(), 0.1);
Map<DownSamplingAlgorithm, Integer> param = new LinkedHashMap<>();
param.put(DSAlgorithms.LTTB, threshold);
param.put(mixed, (int) (threshold * 1));
param.put(DSAlgorithms.PIPLOT, (int) (threshold * 0.3));
// param.put(DSAlgorithms.LTD, threshold);
// param.put(DSAlgorithms.MAXMIN, (int) (threshold * 0.5));
new DownSampleTest().execute(rawData, param);
}
public void execute(List<Event> rawData, Map<DownSamplingAlgorithm, Integer> map) {
Map<String, List<Event>> data = new LinkedHashMap<String, List<Event>>();
for (Entry<DownSamplingAlgorithm, Integer> en : map.entrySet()) {
DownSamplingAlgorithm a = en.getKey();
int threshold = en.getValue();
long t = System.currentTimeMillis();
List<Event> downsampled = a.process(rawData, threshold);
data.put(en.getKey().toString(), downsampled);
System.err.println(rawData.size() + "->" + downsampled.size() + " using " + a + " in "
+ (System.currentTimeMillis() - t) + "ms");
}
new DownsampleResultChart(rawData, data).render();
}
}

View File

@ -0,0 +1,124 @@
package com.cqu.algorithm.downsampling.test;
import com.cqu.algorithm.downsampling.Event;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.chart.plot.XYPlot;
import org.jfree.chart.renderer.xy.XYLineAndShapeRenderer;
import org.jfree.data.xy.XYSeries;
import org.jfree.data.xy.XYSeriesCollection;
import org.jfree.ui.ApplicationFrame;
import java.awt.*;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class DownsampleResultChart extends ApplicationFrame {
private static final long serialVersionUID = -6410314314853035407L;
private final Collection<Event> rawData;
private final Map<String, List<Event>> dataMap;
public DownsampleResultChart(List<Event> rawData, Map<String, List<Event>> dataMap) {
super("Downsample Result");
this.rawData = rawData;
this.dataMap = dataMap;
}
private ChartPanel renderChart(String title, int width, int height, XYSeries... series) {
final XYSeriesCollection data = new XYSeriesCollection();
boolean legend = false;
boolean shapes = false;
if (series.length > 1) {
legend = true;
shapes = false;
}
double maxY = series[0].getMaxY();
double minY = series[0].getMinY();
for (XYSeries serie : series) {
data.addSeries(serie);
}
JFreeChart chart = ChartFactory.createXYLineChart(title, "Time", "Value", data, PlotOrientation.VERTICAL,
legend, true, false);
chart.setAntiAlias(true);
chart.setTextAntiAlias(true);
XYPlot plot = (XYPlot) chart.getPlot();
// plot.setBackgroundAlpha(0);
XYLineAndShapeRenderer renderer = new XYLineAndShapeRenderer();
renderer.setSeriesLinesVisible(0, true);
renderer.setSeriesShapesVisible(0, shapes);
renderer.setSeriesLinesVisible(1, true);
renderer.setSeriesShapesVisible(1, shapes);
renderer.setSeriesPaint(1, Color.gray);
renderer.setSeriesPaint(0, Color.black);
plot.getRangeAxis().setRangeWithMargins(minY, maxY);
plot.setRenderer(renderer);
final ChartPanel chartPanel = new ChartPanel(chart, false);
chartPanel.setMouseZoomable(true, true);
chartPanel.setPreferredSize(new Dimension(width, height));
return chartPanel;
}
private XYSeries createSeries(Collection<Event> records, String name) {
final XYSeries series = new XYSeries(name);
for (Event record : records) {
series.add((double) record.getTime(), record.getValue());
}
return series;
}
public void render() {
Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize();
int sw = screenSize.width;
int sh = screenSize.height;
int count = dataMap.size();
int width = count <= 1 ? (int) (sw * 0.6) : (int) (sw * 0.9 / count);
int height = (int) (sh * 0.3);
int dx = count <= 1 ? (int) (sw * 0.2) : (int) (sw * 0.05);
int dy = (int) (sh * 0.02);
setLayout(new GridLayout(3, count));
XYSeries rawSerie = createSeries(rawData, "Raw Data");
Map<String, XYSeries> dSeries = new LinkedHashMap<String, XYSeries>();
for (Entry<String, List<Event>> en : dataMap.entrySet()) {
String name = "Raw(" + rawSerie.getItems().size() + ")";
getContentPane().add(renderChart(name, width, height, rawSerie));
dSeries.put(en.getKey(), createSeries(en.getValue(), "Downsampled Data"));
}
for (Entry<String, XYSeries> en : dSeries.entrySet()) {
String name = "By " + en.getKey() + "(" + en.getValue().getItems().size() + ")";
getContentPane().add(renderChart(name, width, height, en.getValue()));
}
for (XYSeries s : dSeries.values()) {
getContentPane().add(renderChart("Comparison", width, height, s, rawSerie));
}
setBounds(dx, dy, 0, 0);
pack();
setVisible(true);
}
}

View File

@ -0,0 +1,77 @@
package com.cqu.algorithm.downsampling.test;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.PlainEvent;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
public class TestData {
public static List<Event> read(String name, String start, String end) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
List<Event> data = new ArrayList<Event>();
InputStream in = null;
try {
long startTime = sdf.parse(start).getTime();
long endTime = sdf.parse(end).getTime();
in = TestData.class.getResourceAsStream("/" + name);
for (String line : IOUtils.readLines(in)) {
if (line == null || line.isEmpty()) {
continue;
}
String[] parts = line.split(",");
long time = sdf.parse(parts[0]).getTime();
if (time < startTime) {
continue;
} else if (time > endTime) {
break;
}
PlainEvent event = new PlainEvent(time, Double.parseDouble(parts[1]));
data.add(event);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(in);
}
return data;
}
public static List<Event> read(String name, int max) {
List<Event> data = new ArrayList<Event>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
InputStream in = null;
try {
in = TestData.class.getResourceAsStream("/" + name);
for (String line : IOUtils.readLines(in)) {
if (line == null || line.isEmpty()) {
continue;
}
String[] parts = line.split(",");
PlainEvent event = new PlainEvent(sdf.parse(parts[0]).getTime(), Double.parseDouble(parts[1]));
data.add(event);
if (data.size() >= max) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(in);
}
return data;
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -13,6 +13,7 @@
<packaging>pom</packaging>
<modules>
<module>rt-warehouse</module>
<module>algorithm</module>
</modules>
<properties>

View File

@ -25,14 +25,14 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
@ -45,21 +45,21 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
@ -72,28 +72,28 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<!--Flink默认使用的是slf4j记录日志相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<!--lomback插件依赖-->
@ -101,7 +101,7 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.14</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<!--添加flink-mysql的cdc依赖-->
@ -178,14 +178,21 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<!--降采样算法-->
<dependency>
<groupId>com.cqu</groupId>
<artifactId>algorithm</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

View File

@ -28,7 +28,7 @@ public abstract class BaseStreamApp {
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
env.setStateBackend(new FsStateBackend(PHMConfig.RT_CHECKPOINT_LOCATION));
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
System.setProperty("HADOOP_USER_NAME", PHMConfig.HADOOP_USER_NAME);
//模板
execute(env);
env.execute();

View File

@ -1,9 +1,32 @@
package com.cqu.warehouse.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.func.DataSamplingFunction;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*@BelongsProject: phm_parent
@ -12,8 +35,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
*@CreateTime: 2023-05-22 19:55
*@Description:
* TODO 风电cms数据DWM层:
* 1)同一时刻数据拼接 -状态编程? 每一个时刻10万个点,全都放在内存当中不太合理?
* 1)同一时刻数据拼接 -状态编程? 每一个时刻10万个点,全都放在内存当中不太合理? ->降维后的数据再进行拼接
* 2)lqtt算法提取数据的边界形状
* 3)考虑是否需要做多层的情况
*@Version: 1.0
*/
public class WindCMSApp extends BaseStreamApp {
@ -32,8 +56,171 @@ public class WindCMSApp extends BaseStreamApp {
MyKafkaUtils.getKafkaSource(topic, groupId)
);
kafkaDS.print(">>>");
//TODO 2.转为jsonObject
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSON::parseObject);
// jsonObjectDS.print("***");
//TODO 3.异步操作-进行数据降维 - 不知道数据降维可能花费的时间,操作异步操作实现
//3.1 降采样10倍的
SingleOutputStreamOperator<JSONObject> downSamplingTenDS = AsyncDataStream.unorderedWait(
jsonObjectDS,
new DataSamplingFunction<JSONObject>() {
@Override
public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) {
obj.put("x", downSampleData);
}
@Override
public int getFreq(JSONObject obj) {
return Integer.parseInt(obj.getString("freq"));
}
@Override
public int getThreshold(JSONObject obj) {
return obj.getJSONArray("x").size() / 10;
}
@Override
public List<Double> getDownSamplingData(JSONObject obj) {
JSONArray x = obj.getJSONArray("x");
return x.toJavaList(Double.class);
}
},
60, TimeUnit.SECONDS
);
// downSamplingDataDS.print(">>>");
//3.2 降采样100倍的
SingleOutputStreamOperator<JSONObject> downSamplingHundredDS = AsyncDataStream.unorderedWait(
downSamplingTenDS,
new DataSamplingFunction<JSONObject>() {
@Override
public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) {
obj.put("x", downSampleData);
}
@Override
public int getFreq(JSONObject obj) {
return Integer.parseInt(obj.getString("freq"));
}
@Override
public int getThreshold(JSONObject obj) {
return obj.getJSONArray("x").size() / 10;
}
@Override
public List<Double> getDownSamplingData(JSONObject obj) {
JSONArray x = obj.getJSONArray("x");
return x.toJavaList(Double.class);
}
},
60, TimeUnit.SECONDS
);
//3.3 降采样1000倍的
// SingleOutputStreamOperator<JSONObject> downSamplingThousandDS = AsyncDataStream.unorderedWait(
// downSamplingHundredDS,
// new DataSamplingFunction<JSONObject>() {
// @Override
// public void setDownSamplingData(JSONObject obj, List<Double> downSampleData) {
// obj.put("x", downSampleData);
// }
//
// @Override
// public int getFreq(JSONObject obj) {
// return Integer.parseInt(obj.getString("freq"));
// }
//
// @Override
// public int getThreshold(JSONObject obj) {
// return obj.getJSONArray("x").size() / 10;
// }
//
// @Override
// public List<Double> getDownSamplingData(JSONObject obj) {
// JSONArray x = obj.getJSONArray("x");
// return x.toJavaList(Double.class);
// }
// },
// 60, TimeUnit.SECONDS
// );
//TODO 4.合并同一时间段的CMS(降采样10倍的)
//4.1 添加上时间戳字段
SingleOutputStreamOperator<JSONObject> tsDS = downSamplingTenDS.map(
new RichMapFunction<JSONObject, JSONObject>() {
SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyyMMddHHmmss");
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
String realtime = jsonObject.getString("realtime");
if (realtime.length() < 8) jsonObject.put("ts", new Date().getTime());
else jsonObject.put("ts", sdf.parse(realtime).getTime());
return jsonObject;
}
}
);
//4.2 注册水位线
SingleOutputStreamOperator<JSONObject> dataWithWatermarkDS = tsDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("ts");
}
})
);
//4.3 根据风场和风机号分组
KeyedStream<JSONObject, Tuple2<String, String>> keyedDS = dataWithWatermarkDS.keyBy(
new KeySelector<JSONObject, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(JSONObject jsonObject) throws Exception {
return Tuple2.of(jsonObject.getString("windfarm"), jsonObject.getString("wt_no"));
}
}
);
//4.4 开窗统计
SingleOutputStreamOperator<JSONObject> mergeDS = keyedDS
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(
new ProcessWindowFunction<JSONObject, JSONObject, Tuple2<String, String>, TimeWindow>() {
//窗口结束时触发
@Override
public void process(Tuple2<String, String> stringStringTuple2, ProcessWindowFunction<JSONObject, JSONObject, Tuple2<String, String>, TimeWindow>.Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception {
JSONObject obj = null;
for (JSONObject jsonObject : iterable) {
if (obj == null) {
obj = jsonObject;
} else {
JSONArray x = obj.getJSONArray("x");
x.addAll(jsonObject.getJSONArray("x"));
}
}
collector.collect(obj);
}
}
);
//TODO 合并的结果大概有10000个数据:判断是否合理
mergeDS.print(">>>");
//TODO 5.写回kafka
mergeDS
.map(jsonObject -> jsonObject.toJSONString())
.addSink(
MyKafkaUtils.getKafkaSink("dwm_wind_cms_10_downsampling")
);
}
}

View File

@ -0,0 +1,29 @@
package com.cqu.warehouse.realtime.app.func;
import com.cqu.warehouse.realtime.utils.DownSamplingUtils;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import java.util.Collections;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-23 16:21
*@Description: TODO 进行数据降维的异步Function
*@Version: 1.0
*/
public abstract class DataSamplingFunction<T> implements AsyncFunction<T, T>, DownSampleFunction<T> {
@Override
public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
List<Double> rawData = getDownSamplingData(obj);
int freq = getFreq(obj);
int threshold = getThreshold(obj);
List<Double> downSampleData = DownSamplingUtils.downSamplingData(rawData, freq, threshold);
setDownSamplingData(obj,downSampleData);
resultFuture.complete(Collections.singleton(obj));
}
}

View File

@ -0,0 +1,22 @@
package com.cqu.warehouse.realtime.app.func;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-23 16:31
*@Description: TODO 降采样接口需要实现的方法
*@Version: 1.0
*/
public interface DownSampleFunction<T> {
void setDownSamplingData(T obj, List<Double> downSampleData);
int getFreq(T obj);
int getThreshold(T obj);
List<Double> getDownSamplingData(T obj);
}

View File

@ -14,4 +14,5 @@ public class PHMConfig {
public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//Phoenix连接
public static final String RT_CHECKPOINT_LOCATION = "hdfs://Ding202:8020/phm_warehouse/rt/ck";//检查点保存位置
public static final String HADOOP_USER_NAME = "dingjiawen";//检查点保存位置
}

View File

@ -0,0 +1,82 @@
package com.cqu.warehouse.realtime.utils;
import com.cqu.algorithm.downsampling.DSAlgorithms;
import com.cqu.algorithm.downsampling.DownSamplingAlgorithm;
import com.cqu.algorithm.downsampling.Event;
import com.cqu.algorithm.downsampling.impl.PlainEvent;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-23 15:03
*@Description:
* TODO 降采样工具类:实现降采样
*@Version: 1.0
*/
public class DownSamplingUtils {
//封装成Event
private static List<Event> constructEvent(List<Double> data, int freq) {
List<Event> rawData = new ArrayList<>();
long time = System.currentTimeMillis();
// long i = 1 / freq * 1000;//一个点的采样间隔;成1000是变成毫秒
// long i = 1 / freq * 100000;
long i = 100;
//:freq:25600 1/25600*1000=0.0390625
if (data == null) return null;
for (Double eachData : data) {
if (eachData != null) {
rawData.add(new PlainEvent(time, eachData));
time += i;
}
}
return rawData;
}
/**
* 降采样算法:默认使用LTTB算法进行降采样
* @param data 需要降采样的数据
* @param threshold 降采样到多少个点
* @return
*/
public static List<Double> downSamplingData(List<Double> data, int freq, int threshold) {
List<Event> eventList = constructEvent(data, freq);
if (eventList == null||eventList.size()<=10) return new ArrayList<>();
List<Event> downSampled = execute(eventList, DSAlgorithms.LTTB, threshold);
return rebuildData(downSampled);
}
public static List<Double> downSamplingData(List<Double> data, int freq, int threshold, DownSamplingAlgorithm algorithm) {
List<Event> eventList = constructEvent(data, freq);
List<Event> downSampled = execute(eventList, algorithm, threshold);
return rebuildData(downSampled);
}
//降采样算法
private static List<Event> execute(List<Event> rawData, DownSamplingAlgorithm algorithm, int threshold) {
long t = System.currentTimeMillis();
List<Event> downsampled = algorithm.process(rawData, threshold);
// System.out.println(rawData.size() + "->" + downsampled.size() + " using " + algorithm + " in "
// + (System.currentTimeMillis() - t) + "ms");
return downsampled;
}
//将Event拆分为一个一个的数据
private static List<Double> rebuildData(List<Event> data) {
List<Double> rebuildData = new ArrayList<>();
for (Event eachEvent : data) {
rebuildData.add(eachEvent.getValue());
}
return rebuildData;
}
}

View File

@ -0,0 +1,36 @@
package com.cqu.warehouse.realtime.utils;
import org.jtransforms.dct.DoubleDCT_1D;
import org.jtransforms.fft.DoubleFFT_1D;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-23 20:36
*@Description:
* TODO 一些相关变换的工具类:
* 1)FFT
* 2)DCT
* 3)DST
* 4)DHT
*@Version: 1.0
*/
public class TransformUtils {
public static void fft(double[] data) {
new DoubleFFT_1D(data.length).complexForward(data);
}
public static double[] fft(List<Double> data) {
double[] raw = new double[data.size()];
for (int i = 0; i < raw.length; i++) {
raw[i] = data.get(i);
}
// fft(data.toArray(new Double[0]));
fft(raw);
return raw;
}
}