From 357262888d6a7f57fd8e6ff2fb33b21c32f2267a Mon Sep 17 00:00:00 2001 From: markilue <745518019@qq.com> Date: Mon, 8 May 2023 21:19:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=95=B0=E4=BB=93=E4=BB=BB?= =?UTF-8?q?=E5=8A=A12=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rt-gmall-parent/gmall-realtime/pom.xml | 20 ++ .../gmall/realtime/app/dwd/BaseDBApp.java | 22 ++- .../gmall/realtime/app/dwd/BaseLogApp.java | 6 +- .../atguigu/gmall/realtime/app/dwd/log.json | 4 + .../realtime/app/dwm/UniqueVisitorApp.java | 185 ++++++++++++++++++ .../realtime/app/dwm/UserJumpDetailApp.java | 163 +++++++++++++++ .../gmall/realtime/app/func/DimSink.java | 73 +++++++ .../app/func/TableProcessFunction.java | 93 ++++++++- .../gmall/realtime/common/GmallConfig.java | 15 ++ .../gmall/realtime/utils/MyKafkaUtils.java | 38 +++- .../gmall/realtime/utils/MyPhoenixUtils.java | 5 +- .../src/main/resources/hbase-site.xml | 40 ++++ .../src/main/resources/log4j.properties | 2 +- 13 files changed, 646 insertions(+), 20 deletions(-) create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UniqueVisitorApp.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/common/GmallConfig.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/hbase-site.xml diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml index a9c7007..151e239 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml @@ -114,6 +114,26 @@ compile + + + commons-beanutils + commons-beanutils + 1.9.3 + + + + org.apache.phoenix + phoenix-spark + 5.0.0-HBase-2.0 + + + org.glassfish + javax.el + + + + diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java index 88ef01c..4200553 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; +import com.atguigu.gmall.realtime.app.func.DimSink; import com.atguigu.gmall.realtime.app.func.MyDeserializationSchemaFunction; import com.atguigu.gmall.realtime.app.func.TableProcessFunction; import com.atguigu.gmall.realtime.beans.TableProcess; @@ -19,10 +20,15 @@ import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; /** *@BelongsProject: rt-gmall-parent @@ -108,7 +114,7 @@ public class BaseDBApp { OutputTag dimTag = new OutputTag("dimTag") { }; SingleOutputStreamOperator realDS = connectDS.process( - new TableProcessFunction(dimTag,mapStateDescriptor) + new TableProcessFunction(dimTag, mapStateDescriptor) ); //获取维度侧输出流 @@ -117,9 +123,21 @@ public class BaseDBApp { realDS.print("####"); - //TODO 8.将维度侧输出流的数据写到Hbase中 + //TODO 8.将维度侧输出流的数据写到Hbase(phoenix)中 + + dimDS.addSink(new DimSink()); //TODO 9.将主流数据写回到Kafka的dwd层中 + //因为不同的表需要发送到不同的主题当中去,所以就不用采用传统的方式 + //但是为了保证精确一致性,尽可能的使用Flink自己提供的方式,因此采用了以下的方式 + + realDS.addSink(MyKafkaUtils.getKafkaSinkBySchema(new KafkaSerializationSchema() { + @Override + public ProducerRecord serialize(JSONObject jsonObject, @Nullable Long aLong) { + String topic = jsonObject.getString("sink_table"); + return new ProducerRecord(topic, jsonObject.getJSONObject("data").toJSONString().getBytes()); + } + })); env.execute(); diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java index e3e7217..94369ab 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java @@ -213,9 +213,9 @@ public class BaseLogApp { //6.3 获取不同流数据 输出测试 DataStream startDs = pageDs.getSideOutput(startTag); DataStream displayDS = pageDs.getSideOutput(displayTag); -// pageDs.print(">>>>"); -// startDs.print("####"); -// displayDS.print("&&&&"); + pageDs.print(">>>>"); + startDs.print("####"); + displayDS.print("&&&&"); //TODO 7.将不同流的数据写到kafka的dwd不同的主题中 diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json index a1d8723..509b9d4 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json @@ -234,3 +234,7 @@ + + + + diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UniqueVisitorApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UniqueVisitorApp.java new file mode 100644 index 0000000..fd4f053 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UniqueVisitorApp.java @@ -0,0 +1,185 @@ +package com.atguigu.gmall.realtime.app.dwm; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; + +import java.text.SimpleDateFormat; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm + *@Author: markilue + *@CreateTime: 2023-05-08 16:56 + *@Description: + * TODO 独立访客计算 + * 需要启动的进程: + * zk,kafka,logger,hadoop,模拟日志jar,UniqueVisitorApp,BaseLogApp + * 执行流程: + * 1.运行模拟生成日志的jar + * 2.将人们ONI生成日志数据发送给nginx进行负载均衡 + * 3.nginx将请求转发给三台日志采集服务 + * 4.三台日志采集服务器接收到日志数据 将日志数据发送到kafka的ods_base_log主题中 + * 5.BaseLogApp应用程序从ods_base_log中读取数据,进行分流 + * >启动日志 --dwd_start_log + * >曝光日志 --dwd_display_log + * >页面日志 --dwd_page_log + * 6.uniqueVisitorApp从dwd_page_log主题读取数据 + * >对pv进行过滤 + * >按照mid进行分组 + * >使用filter算子对数据进行过滤 + * >在过滤的时候,使用状态变量记录上次访问日期 + * > + * + *@Version: 1.0 + */ +public class UniqueVisitorApp { + + + public static void main(String[] args) throws Exception { + + //TODO 1.基础环境状态 + + //1.1 流处理环境 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + //1.2 设置并行度 + env.setParallelism(1); + + //TODO 2.检查点设置 + //2.1 开启检查点 + env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); + + //2.2 设置检查点超时时间 + env.getCheckpointConfig().setCheckpointTimeout(60000); + //2.3 设置重启策略 + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + //2.4 设置job取消后,检查点是否保存 + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + //2.5 设置状态后端 内存/文件系统/RocksDB + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); + //2.6 指定操作HDFS用户 + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + //TODO 3.从kafka中读取数据 + + //3.1 声明消费主题以及消费者组 + String topic = "dwd_page_log"; + String groupId = "unique_visitor_app_group"; + + //3.2 获取kafka消费者对象 + FlinkKafkaConsumer kafkaSource = MyKafkaUtils.getKafkaSource(topic, groupId); + + //3.3 读取数据封装流 + DataStreamSource kafkaDS = env.addSource(kafkaSource); + +// kafkaDS.print(">>>"); + + //TODO 4.对读取的数据进行类型转换 String ->JSONObject + SingleOutputStreamOperator jsonDS = kafkaDS.map(JSON::parseObject); + +// jsonDS.print(">>>>"); + + //TODO 5.按照设备id进行分层 + /* + { + "common": { + "ar": "420000", + "uid": "49", + "os": "iOS 13.2.3", + "ch": "Appstore", + "is_new": "1", + "md": "iPhone Xs", + "mid": "mid_13", + "vc": "v2.1.134", + "ba": "iPhone" + }, + "page": { + "page_id": "payment", + "item": "1,8", + "during_time": 1329, + "item_type": "sku_ids", + "last_page_id": "trade" + }, + "ts": 1683278828000 + } + */ + KeyedStream keyedDS = jsonDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid")); + + //TODO 6.过滤实现 + SingleOutputStreamOperator filterDs = keyedDS.filter( + new RichFilterFunction() { + + //声明状态变量,用于存放上次访问日期 + private ValueState lastVisitDateState; + //转化日期格式工具类 + private SimpleDateFormat sdf; + + @Override + public void open(Configuration parameters) throws Exception { + sdf = new SimpleDateFormat("yyyyMMdd"); + //注意:UV其实可以延伸为计算日活,如果是日活,则状态值主要用于筛选当天是否访问过,所以状态过了今天基本上就没有存在的意义了 + //所以这类设置状态的失效时间为1天 + ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("lastVisitDate", String.class); + + StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//在创建和写入的时候改变(就是默认值) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//如果过期,如果还没有被垃圾回收,是否返回给状态调用者(这也是默认值) + .build();//构造者设计模式 + valueStateDescriptor.enableTimeToLive(ttlConfig); + lastVisitDateState = getRuntimeContext().getState(valueStateDescriptor); + + } + + @Override + public boolean filter(JSONObject jsonObject) throws Exception { + + //如果是从其他页面跳转的,那么直接过滤掉 + String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id"); + if (lastPageId != null && lastPageId.length() > 0) { + return false; + } + //获取状态中上次访问日志 + String lastPageDate = lastVisitDateState.value(); + String curVisitDate = sdf.format(jsonObject.getLong("ts")); + if (lastPageDate != null && lastPageDate.length() > 0 && lastPageDate.equals(curVisitDate)) { + //已经访问过 + return false; + } else { + //还没有访问过 + lastVisitDateState.update(curVisitDate); + return true; + } + } + } + ); + + filterDs.print(">>>>"); + + //TODO 7.将过滤后的UV数据 写回到kafka的dwm层 + + filterDs.map(jsonObject -> jsonObject.toJSONString()).addSink( + MyKafkaUtils.getKafkaSink("dwm_unique_visitor") + );//需要转为String之后addSink才行 + + + env.execute(); + + + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java new file mode 100644 index 0000000..4544f47 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java @@ -0,0 +1,163 @@ +package com.atguigu.gmall.realtime.app.dwm; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cep.CEP; +import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.functions.PatternProcessFunction; +import org.apache.flink.cep.functions.TimedOutPartialMatchHandler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.List; +import java.util.Map; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm + *@Author: markilue + *@CreateTime: 2023-05-08 20:17 + *@Description: TODO 用户跳出明细统计 + *@Version: 1.0 + */ +public class UserJumpDetailApp { + + public static void main(String[] args) throws Exception { + //TODO 1.创建流环境 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + + //TODO 2.检查点设置 + env.enableCheckpointing(5000L); + env.getCheckpointConfig().setCheckpointTimeout(60000); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + + //TODO 3.从kafka中读取数据 + String topic = "dwd_page_log"; + String groupId = "user_jump_detail_app_group"; + DataStreamSource kafkaDS = env.addSource(MyKafkaUtils.getKafkaSource(topic, groupId)); + + //TODO 4.map String ->JSONObject + SingleOutputStreamOperator jsonDS = kafkaDS.map(JSON::parseObject); +// jsonDS.print(">>>"); + + + //TODO 5.指定watermark以及提取事件时间字段 + SingleOutputStreamOperator jsonObjWithWatermarkDS = jsonDS.assignTimestampsAndWatermarks( + WatermarkStrategy.forMonotonousTimestamps() + .withTimestampAssigner( + new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(JSONObject jsonObject, long l) { + return jsonObject.getLong("ts"); + } + } + ) + ); + + //TODO 6.按照mid分组 + KeyedStream keyedDS = jsonObjWithWatermarkDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid")); + + + /* + { + "common": { + "ar": "370000", + "uid": "44", + "os": "iOS 13.3.1", + "ch": "Appstore", + "is_new": "1", + "md": "iPhone X", + "mid": "mid_13", + "vc": "v2.1.132", + "ba": "iPhone" + }, + "page": { + "page_id": "cart", + "during_time": 14540, + "last_page_id": "good_detail" + }, + "ts": 1683289583000 + } + */ + //TODO 7.定义pattern + Pattern pattern = Pattern.begin("first").where( + new SimpleCondition() { + @Override + public boolean filter(JSONObject jsonObject) { + //条件1:上次跳转为空 + String lastPage = jsonObject.getJSONObject("page").getString("last_page_id"); + return lastPage == null || lastPage.length() == 0; + } + } + ).next("second").where( + new SimpleCondition() { + //访问了网站其他的页面 + @Override + public boolean filter(JSONObject jsonObject) { + //事实上:只要是第二个时间来了,进入这个方法,就证明他第二次访问了,可以直接返回为true + String pageId = jsonObject.getJSONObject("page").getString("page_id"); + return pageId != null && pageId.length() > 0; + } + } + ).within(Time.seconds(10));//10秒钟;超时的数据或者没有下一条的数据,会放入侧输出流中 + + //TODO 8.将pattern应用到流上 + PatternStream patternDS = CEP.pattern(keyedDS, pattern); + + //TODO 9.从流中提取数据 + + OutputTag outputTag = new OutputTag("lateData") {}; + patternDS.process( + new MyPatternProcessFunction(outputTag) + ); + + + env.execute(); + + } +} + + +class MyPatternProcessFunction extends PatternProcessFunction implements TimedOutPartialMatchHandler { + + private OutputTag outputTag; + + public MyPatternProcessFunction(OutputTag outputTag) { + this.outputTag = outputTag; + } + + //处理匹配上的数据 + @Override + public void processMatch(Map> match, Context ctx, Collector out) throws Exception { + //map中存放<满足要求的name,list(满足要求的第一个事件)> + + } + + @Override + public void processTimedOutMatch(Map> match, Context ctx) throws Exception { + JSONObject startEvent = match.get("start").get(0); + ctx.output(outputTag, startEvent.toJSONString()); + + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java new file mode 100644 index 0000000..5397208 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java @@ -0,0 +1,73 @@ +package com.atguigu.gmall.realtime.app.func; + +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.common.GmallConfig; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.sql.Connection; + +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Map; +import java.util.Set; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-08 14:45 + *@Description: TODO 将维度侧输出流的数据写到Hbase(phoenix)中 + *@Version: 1.0 + */ +public class DimSink extends RichSinkFunction { + + //声明连接对象 + private Connection connection; + + @Override + public void open(Configuration parameters) throws Exception { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER); + } + + + @Override + public void invoke(JSONObject jsonObject, Context context) { + //{"database":"rt_gmall","xid":17654,"data":{"tm_name":"dadaaa","id":13},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1683527538} + //获取维度表表名 + String tableName = jsonObject.getString("sink_table"); + JSONObject data = jsonObject.getJSONObject("data"); + + //拼接插入语句 upsert into 表空间.表名 values(xx,xx,xx) + String upsertSQL = generateSQL(tableName, data); + System.out.println("向phoenix维度表中插入数据:" + upsertSQL); + //创建数据库对象 + try (PreparedStatement ps = connection.prepareStatement(upsertSQL)) { + + ps.executeUpdate(); + //TODO 注意:提交事务(MQSQL默认自动提交事务,Phoenix默认是手动提交事务) + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + throw new RuntimeException("向phoenix维度表中插入数据失败"); + } + + } + + private String generateSQL(String tableName, JSONObject data) { + + String key = StringUtils.join(data.keySet(), ","); + String value = StringUtils.join(data.values(), "','"); + + String sql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + tableName + + "(" + key + ")" + + " values('" + value + "')"; + //upsert into GMALL_REALTIME.dim_base_trademark(tm_name,id) values('asdas','12') + + + return sql; + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java index f961671..fefc41d 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java @@ -3,13 +3,21 @@ package com.atguigu.gmall.realtime.app.func; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.gmall.realtime.beans.TableProcess; +import com.atguigu.gmall.realtime.common.GmallConfig; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; + /** *@BelongsProject: rt-gmall-parent *@BelongsPackage: com.atguigu.gmall.realtime.app.func @@ -23,11 +31,22 @@ public class TableProcessFunction extends BroadcastProcessFunction dimTag; private MapStateDescriptor mapStateDescriptor; + //声明连接对象 + private Connection connection; + public TableProcessFunction(OutputTag dimTag, MapStateDescriptor mapStateDescriptor) { this.dimTag = dimTag; this.mapStateDescriptor = mapStateDescriptor; } + //创建时只执行一次,非常适合注册驱动和创建连接的操作 + @Override + public void open(Configuration parameters) throws Exception { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER); + + } + //处理业务流中数据 maxwell从业务数据库中采集到的数据 //理论上来说,维度数据一定要比事实数据先到,因为如果没有用户没有订单等信息,就不会有事实 //这件事情,可以人为的控制一下,比如先开flinkCDC后开Maxwell @@ -56,6 +75,13 @@ public class TableProcessFunction extends BroadcastProcessFunction columnSet = new HashSet<>(Arrays.asList(columns)); + + Set> entries = data.entrySet(); + entries.removeIf(entry -> !columnSet.contains(entry.getKey())); + } + //处理广播流中的数据 flinkCDC从Mysql中读取配置信息 //s: {"database":"rt_gmall_realtime","data":{"name":"ssss","id":1},"type":"insert","table":"t_user"} @Override @@ -97,7 +134,7 @@ public class TableProcessFunction extends BroadcastProcessFunction getKafkaSource(String topic,String groupId) { + public static FlinkKafkaConsumer getKafkaSource(String topic, String groupId) { Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -25,13 +30,40 @@ public class MyKafkaUtils { } //获取kafka的生产者 +// public static FlinkKafkaProducer getKafkaSink(String topic) { +// Properties properties = new Properties(); +// properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); +//// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// +// + +// return new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);//从构造方法来看只能保证至少一次 +// } + public static FlinkKafkaProducer getKafkaSink(String topic) { Properties properties = new Properties(); - properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时 // properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); - return new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties); +// return new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);//从构造方法来看只能保证至少一次 + return new FlinkKafkaProducer(topic, new KafkaSerializationSchema() { + @Override + public ProducerRecord serialize(String s, @Nullable Long aLong) { + return new ProducerRecord(topic, s.toString().getBytes()); + } + }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);//从构造方法来看只能保证至少一次 + } + + public static FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema kafkaSerializationSchema) { + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时 +// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + + return new FlinkKafkaProducer(DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java index 97226db..7778453 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java @@ -15,9 +15,10 @@ import java.sql.SQLException; */ public class MyPhoenixUtils { - private static final String URL ="jdbc:phoenix:thin:url=http://Ding202:8765;serialization=PROTOBUF"; + private static final String URL ="jdbc:phoenix:Ding202,Ding203,Ding204:2181"; - public void executeSQL(String sql) throws SQLException { + public void executeSQL(String sql) throws Exception { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); Connection connection = DriverManager.getConnection(URL); PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.execute(); diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/hbase-site.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/hbase-site.xml new file mode 100644 index 0000000..1b4ce1a --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/hbase-site.xml @@ -0,0 +1,40 @@ + + + + + + hbase.rootdir + hdfs://Ding202:8020/hbase + + + + hbase.cluster.distributed + true + + + + hbase.zookeeper.quorum + Ding202,Ding203,Ding204 + + + + hbase.unsafe.stream.capability.enforce + false + + + + hbase.wal.provider + filesystem + + + + phoenix.schema.isNamespaceMappingEnabled + true + + + + phoenix.schema.mapSystemTablesToNamespace + true + + + diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties index c706282..963ca91 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties @@ -1,4 +1,4 @@ -log4j.rootLogger=warn,stdout +log4j.rootLogger=error,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout