From c9f66d2929f495a75c4b28e765a8425d2b1091dd Mon Sep 17 00:00:00 2001 From: dingjiawen <745518019@qq.com> Date: Thu, 8 Sep 2022 09:05:59 +0800 Subject: [PATCH] =?UTF-8?q?flink=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/day01/scala/Example_scala1.scala | 8 +- .../main/java/day08/selftry/Example10.java | 119 ++++++++++++++++++ 2 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 Big_data_example/Flink/src/main/java/day08/selftry/Example10.java diff --git a/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala b/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala index d18b520..055b11a 100644 --- a/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala +++ b/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala @@ -1,9 +1,9 @@ package day01.scala -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +//import org.apache.flink.api.common.functions.FlatMapFunction +//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.util.Collector +//import org.apache.flink.util.Collector object Example_scala1 { @@ -40,7 +40,7 @@ object Example_scala1 { // } // }) // value.print() - env.execute() +// env.execute() // stream.flatMap { // words => { diff --git a/Big_data_example/Flink/src/main/java/day08/selftry/Example10.java b/Big_data_example/Flink/src/main/java/day08/selftry/Example10.java new file mode 100644 index 0000000..a9a3675 --- /dev/null +++ b/Big_data_example/Flink/src/main/java/day08/selftry/Example10.java @@ -0,0 +1,119 @@ +package day08.selftry; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + + +import java.sql.Timestamp; + +import static org.apache.flink.table.api.Expressions.$; + +/** + * @BelongsProject: Flink + * @BelongsPackage: day08.selftry + * @Author: dingjiawen + * @CreateTime: 2022-09-07 16:46 + * @Description: TODO example10的自己尝试 pv的Top10 + * @Version: 1.0 + */ +public class Example10 { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + SingleOutputStreamOperator stream = env.readTextFile("D:\\example\\self_example\\Big_data_example\\Flink\\src\\main\\resources\\UserBehavior.csv") + .map(new MapFunction() { + @Override + public UserBehavior map(String s) throws Exception { + String[] array = s.split(","); + return new UserBehavior(array[0], array[1], array[2], array[3], Long.parseLong(array[4])); + } + }) + .filter(userBehavior -> userBehavior.behavior.equals("pv")) + .assignTimestampsAndWatermarks( + WatermarkStrategy.forMonotonousTimestamps() + .withTimestampAssigner(new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(UserBehavior userBehavior, long l) { + return userBehavior.timeStamp; + } + }) + ); + + //注册表的环境 + EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); + + Table table = tableEnvironment.fromDataStream( + stream, + $("userId"), + $("itemId"), + $("categoryId"), + $("behavior"), + $("timeStamp").rowtime().as("ts") + ); + + tableEnvironment.createTemporaryView("userBehavior",table); + + String countsql="select itemId,COUNT(itemId) as cnt,HOP_END(ts , INTERVAL '5' MINUTE ,INTERVAL '1' HOUR) as end_time " + + "from userBehavior group by itemId,HOP(ts,INTERVAL '5' MINUTE,INTERVAL '1' HOUR)"; + + //开窗求rank + String ranksql="select *,ROW_NUMBER() OVER(PARTITION BY end_time ORDER BY cnt DESC) as rk " + + "FROM ("+countsql+")"; + + //取前三名 + String resultSql = "select * from ("+ranksql+") WHERE rk<=3"; + + Table result = tableEnvironment.sqlQuery(resultSql); + + tableEnvironment.toChangelogStream(result).print(); + env.execute(); + + + } + + /** + * 用户行为POJO类 + */ + public static class UserBehavior { + public String userId; + public String itemId; + public String categoryId; + public String behavior; + public Long timeStamp; + + public UserBehavior() { + + } + + public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timeStamp) { + this.userId = userId; + this.itemId = itemId; + this.categoryId = categoryId; + this.behavior = behavior; + this.timeStamp = timeStamp; + } + + @Override + public String toString() { + return "UserBehavior{" + + "userId='" + userId + '\'' + + ", itemId='" + itemId + '\'' + + ", categoryId='" + categoryId + '\'' + + ", behavior='" + behavior + '\'' + + ", timeStamp=" + new Timestamp(timeStamp) + + '}'; + } + } + + +}