flink对接redis和自定义sink连接mysql

This commit is contained in:
dingjiawen 2022-09-07 09:16:19 +08:00
parent f196a99393
commit 7e0a828387
6 changed files with 412 additions and 6 deletions

View File

@ -91,6 +91,9 @@
<version>${flink.verison}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
@ -124,6 +127,12 @@
<version>${flink.verison}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>

View File

@ -0,0 +1,64 @@
package day07;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @BelongsProject: Flink
* @BelongsPackage: day07
* @Author: dingjiawen
* @CreateTime: 2022-09-06 18:42
* @Description: TODO flink连接redis写入redis
* @Version: 1.0
*/
public class Example3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
Tuple2.of("key", 1),
Tuple2.of("key", 2)
);
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build();
stream.addSink(new RedisSink<Tuple2<String,Integer>>(conf,new MyRedisMapper()));
env.execute();
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String,Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
//一个参数是操作符第二个值是表名
return new RedisCommandDescription(RedisCommand.HSET,"tuple");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> in) {
return in.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> in) {
return in.f1.toString();
}
}
}

View File

@ -0,0 +1,93 @@
package day07;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @BelongsProject: Flink
* @BelongsPackage: day07
* @Author: dingjiawen
* @CreateTime: 2022-09-06 19:04
* @Description: TODO 实现一个mysql的幂等写入
* @Version: 1.0
*/
//create table kv(k varchar(10),v int)
public class Example4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
Tuple2.of("key", 1),
Tuple2.of("key", 2)
);
stream.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
private Connection conn;
private PreparedStatement insertStmt;
private PreparedStatement updateStmt;
//生命周期开始创建mysql的连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/temp",
"root2",
"root2"
);
insertStmt = conn.prepareStatement("INSERT INTO kv (k, v) value(?,?)" );
updateStmt = conn.prepareStatement("UPDATE kv SET v = ? WHERE k=?");
}
//每来一次数据就会执行一次
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
super.invoke(value, context);
//保证幂等性的操作:存在key就修改,不存在才插入
updateStmt.setInt(1, value.f1);
updateStmt.setString(2, value.f0);
updateStmt.execute();
if (updateStmt.getUpdateCount() == 0) {
insertStmt.setString(1, value.f0);
insertStmt.setInt(2, value.f1);
insertStmt.execute();
}
}
//生命周期结束,关闭mysql的连接
@Override
public void close() throws Exception {
super.close();
insertStmt.close();
updateStmt.close();
conn.close();
}
});
env.execute();
}
}

View File

@ -0,0 +1,90 @@
package com.markilue.leecode.listnode;
import org.junit.Test;
import java.util.HashSet;
/**
* @BelongsProject: Leecode
* @BelongsPackage: com.markilue.leecode.listnode
* @Author: dingjiawen
* @CreateTime: 2022-09-06 11:49
* @Description:
* TODO 力扣142环形链表II:
* 给定一个链表的头节点 head返回链表开始入环的第一个节点如果链表无环则返回null
* 如果链表中有某个节点可以通过连续跟踪 next 指针再次到达则链表中存在环
* 为了表示给定链表中的环评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置索引从 0 开始
* 如果 pos -1则在该链表中没有环注意pos 不作为参数进行传递仅仅是为了标识链表的实际情况
* 不允许修改 链表
* @Version: 1.0
*/
public class DetectCycle {
@Test
public void test(){
}
/**
* 代码随想录思路:快慢指针判断相遇
* 内存击败60%速度击败100%
* 时间复杂度O(N)空间复杂度O(1)
* @param head
* @return
*/
public ListNode detectCycle(ListNode head) {
if(head==null||head.next==null){
return null;
}
ListNode fast=head;
ListNode slow=head;
while (fast!=null&&fast.next!=null){
fast=fast.next.next;
slow=slow.next;
//如果快指针和慢指针相等那么有环且两者正好相遇了
if(fast==slow){
//定义一个从头结点出发的点
ListNode x=head;
//定义一个从相遇节点出发的点
ListNode z=fast;
while (x!=z){
x=x.next;
z=z.next;
}
return x; //返回环的入口
}
}
return null;
}
/**
* 哈希表法:维护一个哈希表,当哈希表中出现重复元素时则是环的入口
* 时间复杂度O(N)空间复杂度O(N)
* @param head
* @return
*/
public ListNode detectCycle1(ListNode head) {
if (head == null || head.next == null) {
return null;
}
HashSet<ListNode> nodeHashSet = new HashSet<ListNode>();
ListNode temp=head;
while (temp!=null){
if(nodeHashSet.contains(temp)){
return temp;
}else {
nodeHashSet.add(temp);
}
temp=temp.next;
}
return null;
}
}

View File

@ -0,0 +1,104 @@
package com.markilue.leecode.listnode;
import org.junit.Test;
/**
* @BelongsProject: Leecode
* @BelongsPackage: com.markilue.leecode.listnode
* @Author: dingjiawen
* @CreateTime: 2022-09-06 09:21
* @Description: TODO leecode第206题翻转链表给你单链表的头节点 head ,请你反转链表并返回反转后的链表
* @Version: 1.0
*/
public class ReverseList {
@Test
public void test() {
ListNode listNode = new ListNode(1);
ListNode listNode2 = new ListNode(2);
ListNode listNode3 = new ListNode(3);
ListNode listNode4 = new ListNode(4);
ListNode listNode5 = new ListNode(5);
// ListNode listNode6 = new ListNode(5);
// ListNode listNode7 = new ListNode(6);
listNode.next = listNode2;
listNode2.next = listNode3;
listNode3.next = listNode4;
listNode4.next = listNode5;
// listNode5.next = listNode6;
// listNode6.next = listNode7;
ListNode result = reverseList1(listNode);
print(result);
}
public void print(ListNode listNode) {
if (listNode == null) {
System.out.println("链表为空");
return;
}
while (listNode != null) {
System.out.printf(listNode.val + "->");
listNode = listNode.next;
}
}
/**
* 自己的实现和代码随想录中的双指针法类似
* 内存超过85.07%,速度超过100%
* @param head
* @return
*/
public ListNode reverseList(ListNode head) {
if (head == null || head.next == null) {
return head;
}
ListNode fakeHead = new ListNode(0);
ListNode temp = head;
ListNode temp1 ;
while (temp != null) {
temp1=temp.next;
temp.next=fakeHead.next;
fakeHead.next=temp;
temp=temp1;
}
return fakeHead.next;
}
/**
* 递归法:
* 内存超过78%速度超过100%
*/
public ListNode reverseList1(ListNode head) {
// ListNode fakeHead = new ListNode(0);
return reverseList2(null,head);
}
public ListNode reverseList2(ListNode fakeHead,ListNode head) {
if (head==null){
return fakeHead;
}
ListNode temp=head.next;
//这个head就正是翻转之后的存放的作为下一次的fakeHead
head.next=fakeHead;
return reverseList2(head,temp);
}
}

View File

@ -8,22 +8,34 @@ public class removeNthFromEnd {
public static void main(String[] args) {
ListNode l2 = new ListNode(1);
// l2.next = new ListNode(2);
// l2.next.next = new ListNode(3);
// l2.next.next.next =new ListNode(4);
// l2.next.next.next.next =new ListNode(5);
l2.next = new ListNode(2);
l2.next.next = new ListNode(3);
l2.next.next.next =new ListNode(4);
l2.next.next.next.next =new ListNode(5);
// l2.next.next.next.next.next = new ListNode(9);
// l2.next.next.next.next.next.next =new ListNode(9);
// l2.next.next.next.next.next.next.next = new ListNode(9);
// l2.next.next.next.next.next.next.next.next =new ListNode(9);
// l2.next.next.next.next.next.next.next.next.next =new ListNode(9);
removeNthFromEnd3(l2, 1);
ListNode listNode = removeNthFromEnd(l2, 2);
print(listNode);
getLength(l2);
// getLength(l2);
}
public static void print(ListNode listNode) {
if (listNode == null) {
System.out.println("链表为空");
return;
}
while (listNode != null) {
System.out.printf(listNode.val + "->");
listNode = listNode.next;
}
}
/**
* 获取长度length之后遍历到第length-n的位置删除
*
@ -122,6 +134,40 @@ public class removeNthFromEnd {
}
/**
* 快慢指针再次尝试
* @param head
* @param n
* @return
*/
public static ListNode removeNthFromEnd(ListNode head, int n) {
if(head==null){
return head;
}
ListNode fakeHead=new ListNode(0);
fakeHead.next=head;
ListNode fast=fakeHead;
ListNode slow=fakeHead;
//慢指针一直移动快指针只在满指针移了n次以后再移动则慢指针到达末尾时快指针就到了倒数第N的前一个位置直接删除即可
while (slow.next!=null){
slow=slow.next;
n--;
if(n<0){
fast=fast.next;
}
}
fast.next=fast.next.next;
return fakeHead.next;
}