flink更新和复习

This commit is contained in:
dingjiawen 2022-09-06 09:14:19 +08:00
parent f875a1841e
commit f196a99393
5 changed files with 130 additions and 5 deletions

View File

@ -48,7 +48,7 @@ public class Example5 {
//声明一个状态变量作为累加器
//状态变量的可见范围(作用域)的当前key
//状态变量是单例只能被实例化一次
//状态变量是单例只能被实例化一次
private ValueState<Tuple2<Integer,Integer>> valueState;
//保存定时器的时间戳
private ValueState<Long> timerTs;

View File

@ -0,0 +1,125 @@
package day03.selftry;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Random;
/**
* @BelongsProject: Flink
* @BelongsPackage: day03.selftry
* @Author: dingjiawen
* @CreateTime: 2022-09-05 16:06
* @Description: TODO example5_retry,尝试计算十秒内的平均值
* @Version: 1.0
*/
public class Example5_retry {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(
new SourceFunction<Integer>() {
private Random random = new Random();
private boolean running = true;
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (running) {
int i = random.nextInt(10);
sourceContext.collect(i);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
)
.keyBy(r -> true)
.process(new KeyedProcessFunction<Boolean, Integer, Double>() {
//设置两个状态后端
//用于存储<值之和数量>
private ValueState<Tuple2<Integer, Integer>> valueState;
//存储一个定时器,多久做一次定时任务
private ValueState<Long> timeTs;
//在任务开始时注册并实例化两个状态后端
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
valueState = getRuntimeContext().getState(
//第一个值是名字第二个值是类型
new ValueStateDescriptor<Tuple2<Integer, Integer>>("sum-count", Types.TUPLE(Types.INT, Types.INT))
);
timeTs = getRuntimeContext().getState(
new ValueStateDescriptor<Long>("timer", Types.LONG)
);
}
//对每一个值进行处理,并且设置定时器
@Override
public void processElement(Integer integer, Context context, Collector<Double> collector) throws Exception {
//如果该状态后端的值是null就证明是第一次调用
if (valueState.value() == null) {
valueState.update(Tuple2.of(integer, 1));
} else {
//之后的调用了
Tuple2<Integer, Integer> temp = valueState.value();
valueState.update(Tuple2.of(temp.f0 + integer, temp.f1 + 1));
}
//第一次进来时设置定时器
if (timeTs.value() == null) {
//先根据context获取到当前的处理时间
long currentTime = context.timerService().currentProcessingTime();
//在上下文中注册对应的定时器
context.timerService().registerProcessingTimeTimer(currentTime + 10 * 1000L);
//更新定时器,避免下次再进入这个if
timeTs.update(1111L);
}
}
//定时器函数当定时器触发时会进行调用
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Double> out) throws Exception {
// super.onTimer(timestamp, ctx, out);
if (valueState.value() != null) {
//定时器触发开始计算平均值,并将其放入收集器collect中
out.collect((double) valueState.value().f0 / valueState.value().f1);
//将定时器清空
timeTs.clear();
}
}
})
.print();
env.execute();
}
}

View File

@ -48,7 +48,7 @@ public class Example2 {
//迭代器参数中只包含了一个元素就是增量聚合函数发送过来的聚合结果
long windowStart =context.window().getStart();
long windowEnd = context.window().getEnd();
long count = iterable.iterator().next(); //取出增量聚合函数的一个元素
long count = iterable.iterator().next(); //取出增量聚合函数的一个元素
collector.collect("用户:"+key+"在窗口"
+""+new Timestamp(windowStart)+"~"+new Timestamp(windowEnd)
+""+"中的pv次数是"+count);

View File

@ -33,7 +33,7 @@ public class Example7 {
env.setParallelism(1);
env
.readTextFile("E:\\Big_data_example\\Flink\\src\\main\\resources\\UserBehavior.csv")
.readTextFile("D:\\example\\self_example\\Big_data_example\\Flink\\src\\main\\resources\\UserBehavior.csv")
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String value) throws Exception {

View File

@ -54,7 +54,7 @@ public class Example4 {
orderStream.keyBy(r ->r.userId)
.intervalJoin(pvStream.keyBy(r -> r.userId))
//第一条流和第二条流的哪一段join
//最近10min和未来10min以内的
//最近10min和未来5min以内的
.between(Time.minutes(-10),Time.minutes(5))
.process(new ProcessJoinFunction<Event, Event, String>() {
@Override
@ -67,7 +67,7 @@ public class Example4 {
pvStream.keyBy(r ->r.userId)
.intervalJoin(orderStream.keyBy(r -> r.userId))
//第一条流和第二条流的哪一段join
//最近10min和未来10min以内的
//最近5min和未来10min以内的
.between(Time.minutes(-5),Time.minutes(10))
.process(new ProcessJoinFunction<Event, Event, String>() {
@Override