From 6e66e33c27f6b9202c700845a832ad7967a71b39 Mon Sep 17 00:00:00 2001 From: markilue <745518019@qq.com> Date: Fri, 24 Feb 2023 21:15:29 +0800 Subject: [PATCH] =?UTF-8?q?flink=E8=A7=82=E7=9C=8B=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/day01/scala/Example_scala1.scala | 6 +- .../Flink/src/main/java/day02/Example2.java | 58 +++++++++---------- .../main/java/day03/selftry/Example9_try.java | 1 - .../Flink/src/main/java/day08/Example5.java | 1 - 4 files changed, 32 insertions(+), 34 deletions(-) diff --git a/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala b/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala index 055b11a..5b8e4e7 100644 --- a/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala +++ b/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala @@ -1,9 +1,9 @@ package day01.scala -//import org.apache.flink.api.common.functions.FlatMapFunction -//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -//import org.apache.flink.util.Collector +import org.apache.flink.util.Collector object Example_scala1 { diff --git a/Big_data_example/Flink/src/main/java/day02/Example2.java b/Big_data_example/Flink/src/main/java/day02/Example2.java index d4a7c99..0b18a8d 100644 --- a/Big_data_example/Flink/src/main/java/day02/Example2.java +++ b/Big_data_example/Flink/src/main/java/day02/Example2.java @@ -58,35 +58,35 @@ public class Example2 { .print(); //匿名内部类的方式 -// env -// .addSource(new SourceFunction() { -// -// private boolean running = true; -// private Random random = new Random(); -// -// -// @Override -// public void run(SourceContext ctx) throws Exception { -// -// while (running) { -// ctx.collect(random.nextInt(1000)); -// Thread.sleep(1000L); -// } -// -// } -// -// @Override -// public void cancel() { -// running = false; -// -// } -// }) -// .map(new MapFunction>() { -// @Override -// public Tuple2 map(Integer value) throws Exception { -// return Tuple2.of(value, value); -// } -// }).print(); + env + .addSource(new SourceFunction() { + + private boolean running = true; + private Random random = new Random(); + + + @Override + public void run(SourceContext ctx) throws Exception { + + while (running) { + ctx.collect(random.nextInt(1000)); + Thread.sleep(1000L); + } + + } + + @Override + public void cancel() { + running = false; + + } + }) + .map(new MapFunction>() { + @Override + public Tuple2 map(Integer value) throws Exception { + return Tuple2.of(value, value); + } + }).print(); //外部类的方式 diff --git a/Big_data_example/Flink/src/main/java/day03/selftry/Example9_try.java b/Big_data_example/Flink/src/main/java/day03/selftry/Example9_try.java index a055b31..1071014 100644 --- a/Big_data_example/Flink/src/main/java/day03/selftry/Example9_try.java +++ b/Big_data_example/Flink/src/main/java/day03/selftry/Example9_try.java @@ -11,7 +11,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; -import sun.awt.SunHints; import java.sql.Time; import java.sql.Timestamp; diff --git a/Big_data_example/Flink/src/main/java/day08/Example5.java b/Big_data_example/Flink/src/main/java/day08/Example5.java index 65383b1..9dc00da 100644 --- a/Big_data_example/Flink/src/main/java/day08/Example5.java +++ b/Big_data_example/Flink/src/main/java/day08/Example5.java @@ -12,7 +12,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; -import sun.awt.geom.AreaOp; import java.util.HashMap;