flink更新

This commit is contained in:
dingjiawen 2022-09-08 09:05:59 +08:00
parent 5e970a95f0
commit c9f66d2929
2 changed files with 123 additions and 4 deletions

View File

@ -1,9 +1,9 @@
package day01.scala
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
//import org.apache.flink.api.common.functions.FlatMapFunction
//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.util.Collector
//import org.apache.flink.util.Collector
object Example_scala1 {
@ -40,7 +40,7 @@ object Example_scala1 {
// }
// })
// value.print()
env.execute()
// env.execute()
// stream.flatMap {
// words => {

View File

@ -0,0 +1,119 @@
package day08.selftry;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.sql.Timestamp;
import static org.apache.flink.table.api.Expressions.$;
/**
* @BelongsProject: Flink
* @BelongsPackage: day08.selftry
* @Author: dingjiawen
* @CreateTime: 2022-09-07 16:46
* @Description: TODO example10的自己尝试 pv的Top10
* @Version: 1.0
*/
public class Example10 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<UserBehavior> stream = env.readTextFile("D:\\example\\self_example\\Big_data_example\\Flink\\src\\main\\resources\\UserBehavior.csv")
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] array = s.split(",");
return new UserBehavior(array[0], array[1], array[2], array[3], Long.parseLong(array[4]));
}
})
.filter(userBehavior -> userBehavior.behavior.equals("pv"))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override
public long extractTimestamp(UserBehavior userBehavior, long l) {
return userBehavior.timeStamp;
}
})
);
//注册表的环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
Table table = tableEnvironment.fromDataStream(
stream,
$("userId"),
$("itemId"),
$("categoryId"),
$("behavior"),
$("timeStamp").rowtime().as("ts")
);
tableEnvironment.createTemporaryView("userBehavior",table);
String countsql="select itemId,COUNT(itemId) as cnt,HOP_END(ts , INTERVAL '5' MINUTE ,INTERVAL '1' HOUR) as end_time " +
"from userBehavior group by itemId,HOP(ts,INTERVAL '5' MINUTE,INTERVAL '1' HOUR)";
//开窗求rank
String ranksql="select *,ROW_NUMBER() OVER(PARTITION BY end_time ORDER BY cnt DESC) as rk " +
"FROM ("+countsql+")";
//取前三名
String resultSql = "select * from ("+ranksql+") WHERE rk<=3";
Table result = tableEnvironment.sqlQuery(resultSql);
tableEnvironment.toChangelogStream(result).print();
env.execute();
}
/**
* 用户行为POJO类
*/
public static class UserBehavior {
public String userId;
public String itemId;
public String categoryId;
public String behavior;
public Long timeStamp;
public UserBehavior() {
}
public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timeStamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timeStamp = timeStamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId='" + userId + '\'' +
", itemId='" + itemId + '\'' +
", categoryId='" + categoryId + '\'' +
", behavior='" + behavior + '\'' +
", timeStamp=" + new Timestamp(timeStamp) +
'}';
}
}
}