From 7e0a828387968a1359156fd11e44440ea0bd8ede Mon Sep 17 00:00:00 2001 From: dingjiawen <745518019@qq.com> Date: Wed, 7 Sep 2022 09:16:19 +0800 Subject: [PATCH] =?UTF-8?q?flink=E5=AF=B9=E6=8E=A5redis=E5=92=8C=E8=87=AA?= =?UTF-8?q?=E5=AE=9A=E4=B9=89sink=E8=BF=9E=E6=8E=A5mysql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Big_data_example/Flink/pom.xml | 9 ++ .../Flink/src/main/java/day07/Example3.java | 64 +++++++++++ .../Flink/src/main/java/day07/Example4.java | 93 ++++++++++++++++ .../leecode/listnode/DetectCycle.java | 90 +++++++++++++++ .../leecode/listnode/ReverseList.java | 104 ++++++++++++++++++ .../leecode/listnode/removeNthFromEnd.java | 58 +++++++++- 6 files changed, 412 insertions(+), 6 deletions(-) create mode 100644 Big_data_example/Flink/src/main/java/day07/Example3.java create mode 100644 Big_data_example/Flink/src/main/java/day07/Example4.java create mode 100644 Leecode/src/main/java/com/markilue/leecode/listnode/DetectCycle.java create mode 100644 Leecode/src/main/java/com/markilue/leecode/listnode/ReverseList.java diff --git a/Big_data_example/Flink/pom.xml b/Big_data_example/Flink/pom.xml index 42391f1..6cd64ca 100644 --- a/Big_data_example/Flink/pom.xml +++ b/Big_data_example/Flink/pom.xml @@ -91,6 +91,9 @@ ${flink.verison} + + + org.slf4j slf4j-log4j12 @@ -124,6 +127,12 @@ ${flink.verison} + + org.apache.bahir + flink-connector-redis_2.11 + 1.0 + + diff --git a/Big_data_example/Flink/src/main/java/day07/Example3.java b/Big_data_example/Flink/src/main/java/day07/Example3.java new file mode 100644 index 0000000..ba0e459 --- /dev/null +++ b/Big_data_example/Flink/src/main/java/day07/Example3.java @@ -0,0 +1,64 @@ +package day07; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.redis.RedisSink; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +/** + * @BelongsProject: Flink + * @BelongsPackage: day07 + * @Author: dingjiawen + * @CreateTime: 2022-09-06 18:42 + * @Description: TODO flink连接redis写入redis + * @Version: 1.0 + */ +public class Example3 { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource> stream = env.fromElements( + Tuple2.of("key", 1), + Tuple2.of("key", 2) + ); + + FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build(); + + stream.addSink(new RedisSink>(conf,new MyRedisMapper())); + + + env.execute(); + + + } + + + public static class MyRedisMapper implements RedisMapper> { + + @Override + public RedisCommandDescription getCommandDescription() { + //一个参数是操作符,第二个值是表名 + return new RedisCommandDescription(RedisCommand.HSET,"tuple"); + } + + @Override + public String getKeyFromData(Tuple2 in) { + return in.f0; + } + + @Override + public String getValueFromData(Tuple2 in) { + return in.f1.toString(); + } + } + + + +} diff --git a/Big_data_example/Flink/src/main/java/day07/Example4.java b/Big_data_example/Flink/src/main/java/day07/Example4.java new file mode 100644 index 0000000..2830077 --- /dev/null +++ b/Big_data_example/Flink/src/main/java/day07/Example4.java @@ -0,0 +1,93 @@ +package day07; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; + +/** + * @BelongsProject: Flink + * @BelongsPackage: day07 + * @Author: dingjiawen + * @CreateTime: 2022-09-06 19:04 + * @Description: TODO 实现一个mysql的幂等写入 + * @Version: 1.0 + */ +//create table kv(k varchar(10),v int) +public class Example4 { + + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(1); + + DataStreamSource> stream = env.fromElements( + Tuple2.of("key", 1), + Tuple2.of("key", 2) + ); + + stream.addSink(new RichSinkFunction>() { + + private Connection conn; + private PreparedStatement insertStmt; + private PreparedStatement updateStmt; + + //生命周期开始创建mysql的连接 + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + conn = DriverManager.getConnection( + "jdbc:mysql://localhost:3306/temp", + "root2", + "root2" + ); + insertStmt = conn.prepareStatement("INSERT INTO kv (k, v) value(?,?)" ); + updateStmt = conn.prepareStatement("UPDATE kv SET v = ? WHERE k=?"); + } + + //每来一次数据就会执行一次 + @Override + public void invoke(Tuple2 value, Context context) throws Exception { + super.invoke(value, context); + + //保证幂等性的操作:存在key就修改,不存在才插入 + updateStmt.setInt(1, value.f1); + updateStmt.setString(2, value.f0); + updateStmt.execute(); + if (updateStmt.getUpdateCount() == 0) { + insertStmt.setString(1, value.f0); + insertStmt.setInt(2, value.f1); + insertStmt.execute(); + } + } + + //生命周期结束,关闭mysql的连接 + @Override + public void close() throws Exception { + super.close(); + + insertStmt.close(); + updateStmt.close(); + conn.close(); + } + }); + + + + env.execute(); + + + } + + + +} diff --git a/Leecode/src/main/java/com/markilue/leecode/listnode/DetectCycle.java b/Leecode/src/main/java/com/markilue/leecode/listnode/DetectCycle.java new file mode 100644 index 0000000..640cba7 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/listnode/DetectCycle.java @@ -0,0 +1,90 @@ +package com.markilue.leecode.listnode; + +import org.junit.Test; + +import java.util.HashSet; + +/** + * @BelongsProject: Leecode + * @BelongsPackage: com.markilue.leecode.listnode + * @Author: dingjiawen + * @CreateTime: 2022-09-06 11:49 + * @Description: + * TODO 力扣142环形链表II: + * 给定一个链表的头节点 head,返回链表开始入环的第一个节点。如果链表无环,则返回null。 + * 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 + * 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置(索引从 0 开始)。 + * 如果 pos 是 -1,则在该链表中没有环。注意:pos 不作为参数进行传递,仅仅是为了标识链表的实际情况。 + * 不允许修改 链表。 + + * @Version: 1.0 + */ +public class DetectCycle { + + + @Test + public void test(){ + + } + + /** + * 代码随想录思路:快慢指针判断相遇 + * 内存击败60%,速度击败100% + * 时间复杂度O(N),空间复杂度O(1) + * @param head + * @return + */ + public ListNode detectCycle(ListNode head) { + + if(head==null||head.next==null){ + return null; + } + + ListNode fast=head; + ListNode slow=head; + + while (fast!=null&&fast.next!=null){ + + fast=fast.next.next; + slow=slow.next; + + //如果快指针和慢指针相等,那么有环,且两者正好相遇了 + if(fast==slow){ + //定义一个从头结点出发的点 + ListNode x=head; + //定义一个从相遇节点出发的点 + ListNode z=fast; + while (x!=z){ + x=x.next; + z=z.next; + } + return x; //返回环的入口 + } + } + return null; + } + + /** + * 哈希表法:维护一个哈希表,当哈希表中出现重复元素时,则是环的入口 + * 时间复杂度O(N),空间复杂度O(N) + * @param head + * @return + */ + public ListNode detectCycle1(ListNode head) { + + if (head == null || head.next == null) { + return null; + } + HashSet nodeHashSet = new HashSet(); + ListNode temp=head; + while (temp!=null){ + if(nodeHashSet.contains(temp)){ + return temp; + }else { + nodeHashSet.add(temp); + } + temp=temp.next; + } + return null; + } +} diff --git a/Leecode/src/main/java/com/markilue/leecode/listnode/ReverseList.java b/Leecode/src/main/java/com/markilue/leecode/listnode/ReverseList.java new file mode 100644 index 0000000..bb5ace0 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/listnode/ReverseList.java @@ -0,0 +1,104 @@ +package com.markilue.leecode.listnode; + +import org.junit.Test; + +/** + * @BelongsProject: Leecode + * @BelongsPackage: com.markilue.leecode.listnode + * @Author: dingjiawen + * @CreateTime: 2022-09-06 09:21 + * @Description: TODO leecode第206题,翻转链表:给你单链表的头节点 head ,请你反转链表,并返回反转后的链表。 + * @Version: 1.0 + */ +public class ReverseList { + + + @Test + public void test() { + ListNode listNode = new ListNode(1); + ListNode listNode2 = new ListNode(2); + ListNode listNode3 = new ListNode(3); + ListNode listNode4 = new ListNode(4); + ListNode listNode5 = new ListNode(5); +// ListNode listNode6 = new ListNode(5); +// ListNode listNode7 = new ListNode(6); + listNode.next = listNode2; + listNode2.next = listNode3; + listNode3.next = listNode4; + listNode4.next = listNode5; +// listNode5.next = listNode6; +// listNode6.next = listNode7; + ListNode result = reverseList1(listNode); + print(result); + } + + public void print(ListNode listNode) { + if (listNode == null) { + System.out.println("链表为空"); + return; + } + while (listNode != null) { + System.out.printf(listNode.val + "->"); + listNode = listNode.next; + } + } + + + /** + * 自己的实现,和代码随想录中的双指针法类似 + * 内存超过85.07%,速度超过100% + * @param head + * @return + */ + public ListNode reverseList(ListNode head) { + + if (head == null || head.next == null) { + return head; + } + + ListNode fakeHead = new ListNode(0); + ListNode temp = head; + ListNode temp1 ; + + while (temp != null) { + temp1=temp.next; + temp.next=fakeHead.next; + fakeHead.next=temp; + temp=temp1; + } + + + return fakeHead.next; + + } + + + + /** + * 递归法: + * 内存超过78%,速度超过100% + */ + public ListNode reverseList1(ListNode head) { + +// ListNode fakeHead = new ListNode(0); + + return reverseList2(null,head); + + } + + public ListNode reverseList2(ListNode fakeHead,ListNode head) { + + if (head==null){ + return fakeHead; + } + ListNode temp=head.next; + //这个head就正是翻转之后的存放的,作为下一次的fakeHead + head.next=fakeHead; + + return reverseList2(head,temp); + } + + + + +} diff --git a/Leecode/src/main/java/com/markilue/leecode/listnode/removeNthFromEnd.java b/Leecode/src/main/java/com/markilue/leecode/listnode/removeNthFromEnd.java index 1bb03ea..72f4376 100644 --- a/Leecode/src/main/java/com/markilue/leecode/listnode/removeNthFromEnd.java +++ b/Leecode/src/main/java/com/markilue/leecode/listnode/removeNthFromEnd.java @@ -8,22 +8,34 @@ public class removeNthFromEnd { public static void main(String[] args) { ListNode l2 = new ListNode(1); -// l2.next = new ListNode(2); -// l2.next.next = new ListNode(3); -// l2.next.next.next =new ListNode(4); -// l2.next.next.next.next =new ListNode(5); + l2.next = new ListNode(2); + l2.next.next = new ListNode(3); + l2.next.next.next =new ListNode(4); + l2.next.next.next.next =new ListNode(5); // l2.next.next.next.next.next = new ListNode(9); // l2.next.next.next.next.next.next =new ListNode(9); // l2.next.next.next.next.next.next.next = new ListNode(9); // l2.next.next.next.next.next.next.next.next =new ListNode(9); // l2.next.next.next.next.next.next.next.next.next =new ListNode(9); - removeNthFromEnd3(l2, 1); + ListNode listNode = removeNthFromEnd(l2, 2); + print(listNode); - getLength(l2); +// getLength(l2); } + public static void print(ListNode listNode) { + if (listNode == null) { + System.out.println("链表为空"); + return; + } + while (listNode != null) { + System.out.printf(listNode.val + "->"); + listNode = listNode.next; + } + } + /** * 获取长度length之后,遍历到第length-n的位置删除 * @@ -122,6 +134,40 @@ public class removeNthFromEnd { } + /** + * 快慢指针再次尝试 + * @param head + * @param n + * @return + */ + public static ListNode removeNthFromEnd(ListNode head, int n) { + + if(head==null){ + return head; + } + + ListNode fakeHead=new ListNode(0); + fakeHead.next=head; + ListNode fast=fakeHead; + ListNode slow=fakeHead; + + //慢指针一直移动,快指针只在满指针移了n次以后再移动,则慢指针到达末尾时,快指针就到了倒数第N的前一个位置,直接删除即可 + while (slow.next!=null){ + slow=slow.next; + n--; + if(n<0){ + fast=fast.next; + } + } + fast.next=fast.next.next; + + + + return fakeHead.next; + + } + +