diff --git a/Big_data_example/Flink/src/main/java/day03/Example5.java b/Big_data_example/Flink/src/main/java/day03/Example5.java index 23f84ba..1c55a5c 100644 --- a/Big_data_example/Flink/src/main/java/day03/Example5.java +++ b/Big_data_example/Flink/src/main/java/day03/Example5.java @@ -48,7 +48,7 @@ public class Example5 { //声明一个状态变量作为累加器 //状态变量的可见范围(作用域)的当前key - //状态变量是单例,只能被实例化一次 + //状态变量是单例,只能被实例化一次, private ValueState> valueState; //保存定时器的时间戳 private ValueState timerTs; diff --git a/Big_data_example/Flink/src/main/java/day03/selftry/Example5_retry.java b/Big_data_example/Flink/src/main/java/day03/selftry/Example5_retry.java new file mode 100644 index 0000000..6ebd254 --- /dev/null +++ b/Big_data_example/Flink/src/main/java/day03/selftry/Example5_retry.java @@ -0,0 +1,125 @@ +package day03.selftry; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import java.util.Random; + +/** + * @BelongsProject: Flink + * @BelongsPackage: day03.selftry + * @Author: dingjiawen + * @CreateTime: 2022-09-05 16:06 + * @Description: TODO example5_retry,尝试计算十秒内的平均值 + * @Version: 1.0 + */ +public class Example5_retry { + + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + env + .addSource( + new SourceFunction() { + + private Random random = new Random(); + private boolean running = true; + + @Override + public void run(SourceContext sourceContext) throws Exception { + + while (running) { + int i = random.nextInt(10); + sourceContext.collect(i); + Thread.sleep(1000); + } + + } + + @Override + public void cancel() { + running = false; + } + } + ) + .keyBy(r -> true) + .process(new KeyedProcessFunction() { + + //设置两个状态后端 + //用于存储<值之和,数量> + private ValueState> valueState; + //存储一个定时器,多久做一次定时任务 + private ValueState timeTs; + + + //在任务开始时注册并实例化两个状态后端 + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + valueState = getRuntimeContext().getState( + //第一个值是名字,第二个值是类型 + new ValueStateDescriptor>("sum-count", Types.TUPLE(Types.INT, Types.INT)) + ); + timeTs = getRuntimeContext().getState( + new ValueStateDescriptor("timer", Types.LONG) + ); + } + + //对每一个值进行处理,并且设置定时器 + @Override + public void processElement(Integer integer, Context context, Collector collector) throws Exception { + //如果该状态后端的值是null就证明是第一次调用 + if (valueState.value() == null) { + valueState.update(Tuple2.of(integer, 1)); + } else { + //之后的调用了 + Tuple2 temp = valueState.value(); + valueState.update(Tuple2.of(temp.f0 + integer, temp.f1 + 1)); + } + + //第一次进来时设置定时器 + if (timeTs.value() == null) { + //先根据context获取到当前的处理时间 + long currentTime = context.timerService().currentProcessingTime(); + //在上下文中注册对应的定时器 + context.timerService().registerProcessingTimeTimer(currentTime + 10 * 1000L); + //更新定时器,避免下次再进入这个if + timeTs.update(1111L); + } + + } + + //定时器函数,当定时器触发时会进行调用 + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { +// super.onTimer(timestamp, ctx, out); + if (valueState.value() != null) { + //定时器触发,开始计算平均值,并将其放入收集器collect中 + out.collect((double) valueState.value().f0 / valueState.value().f1); + //将定时器清空 + timeTs.clear(); + } + + + } + }) + .print(); + + + env.execute(); + + + } + + +} diff --git a/Big_data_example/Flink/src/main/java/day04/Example2.java b/Big_data_example/Flink/src/main/java/day04/Example2.java index 28e5e91..d59c462 100644 --- a/Big_data_example/Flink/src/main/java/day04/Example2.java +++ b/Big_data_example/Flink/src/main/java/day04/Example2.java @@ -48,7 +48,7 @@ public class Example2 { //迭代器参数中只包含了一个元素,就是增量聚合函数发送过来的聚合结果 long windowStart =context.window().getStart(); long windowEnd = context.window().getEnd(); - long count = iterable.iterator().next(); //取出增量聚合函数的哪一个元素 + long count = iterable.iterator().next(); //取出增量聚合函数的那一个元素 collector.collect("用户:"+key+"在窗口" +""+new Timestamp(windowStart)+"~"+new Timestamp(windowEnd) +""+"中的pv次数是:"+count); diff --git a/Big_data_example/Flink/src/main/java/day04/Example7.java b/Big_data_example/Flink/src/main/java/day04/Example7.java index 3c91791..3c70385 100644 --- a/Big_data_example/Flink/src/main/java/day04/Example7.java +++ b/Big_data_example/Flink/src/main/java/day04/Example7.java @@ -33,7 +33,7 @@ public class Example7 { env.setParallelism(1); env - .readTextFile("E:\\Big_data_example\\Flink\\src\\main\\resources\\UserBehavior.csv") + .readTextFile("D:\\example\\self_example\\Big_data_example\\Flink\\src\\main\\resources\\UserBehavior.csv") .map(new MapFunction() { @Override public UserBehavior map(String value) throws Exception { diff --git a/Big_data_example/Flink/src/main/java/day06/Example4.java b/Big_data_example/Flink/src/main/java/day06/Example4.java index 80aba21..fcb5946 100644 --- a/Big_data_example/Flink/src/main/java/day06/Example4.java +++ b/Big_data_example/Flink/src/main/java/day06/Example4.java @@ -54,7 +54,7 @@ public class Example4 { orderStream.keyBy(r ->r.userId) .intervalJoin(pvStream.keyBy(r -> r.userId)) //第一条流和第二条流的哪一段join - //最近10min和未来10min以内的 + //最近10min和未来5min以内的 .between(Time.minutes(-10),Time.minutes(5)) .process(new ProcessJoinFunction() { @Override @@ -67,7 +67,7 @@ public class Example4 { pvStream.keyBy(r ->r.userId) .intervalJoin(orderStream.keyBy(r -> r.userId)) //第一条流和第二条流的哪一段join - //最近10min和未来10min以内的 + //最近5min和未来10min以内的 .between(Time.minutes(-5),Time.minutes(10)) .process(new ProcessJoinFunction() { @Override