diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
index a9c7007..151e239 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
@@ -114,6 +114,26 @@
compile
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.3
+
+
+
+ org.apache.phoenix
+ phoenix-spark
+ 5.0.0-HBase-2.0
+
+
+ org.glassfish
+ javax.el
+
+
+
+
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java
index 88ef01c..4200553 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
+import com.atguigu.gmall.realtime.app.func.DimSink;
import com.atguigu.gmall.realtime.app.func.MyDeserializationSchemaFunction;
import com.atguigu.gmall.realtime.app.func.TableProcessFunction;
import com.atguigu.gmall.realtime.beans.TableProcess;
@@ -19,10 +20,15 @@ import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
/**
*@BelongsProject: rt-gmall-parent
@@ -108,7 +114,7 @@ public class BaseDBApp {
OutputTag dimTag = new OutputTag("dimTag") {
};
SingleOutputStreamOperator realDS = connectDS.process(
- new TableProcessFunction(dimTag,mapStateDescriptor)
+ new TableProcessFunction(dimTag, mapStateDescriptor)
);
//获取维度侧输出流
@@ -117,9 +123,21 @@ public class BaseDBApp {
realDS.print("####");
- //TODO 8.将维度侧输出流的数据写到Hbase中
+ //TODO 8.将维度侧输出流的数据写到Hbase(phoenix)中
+
+ dimDS.addSink(new DimSink());
//TODO 9.将主流数据写回到Kafka的dwd层中
+ //因为不同的表需要发送到不同的主题当中去,所以就不用采用传统的方式
+ //但是为了保证精确一致性,尽可能的使用Flink自己提供的方式,因此采用了以下的方式
+
+ realDS.addSink(MyKafkaUtils.getKafkaSinkBySchema(new KafkaSerializationSchema() {
+ @Override
+ public ProducerRecord serialize(JSONObject jsonObject, @Nullable Long aLong) {
+ String topic = jsonObject.getString("sink_table");
+ return new ProducerRecord(topic, jsonObject.getJSONObject("data").toJSONString().getBytes());
+ }
+ }));
env.execute();
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java
index e3e7217..94369ab 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java
@@ -213,9 +213,9 @@ public class BaseLogApp {
//6.3 获取不同流数据 输出测试
DataStream startDs = pageDs.getSideOutput(startTag);
DataStream displayDS = pageDs.getSideOutput(displayTag);
-// pageDs.print(">>>>");
-// startDs.print("####");
-// displayDS.print("&&&&");
+ pageDs.print(">>>>");
+ startDs.print("####");
+ displayDS.print("&&&&");
//TODO 7.将不同流的数据写到kafka的dwd不同的主题中
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json
index a1d8723..509b9d4 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json
@@ -234,3 +234,7 @@
+
+
+
+
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UniqueVisitorApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UniqueVisitorApp.java
new file mode 100644
index 0000000..fd4f053
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UniqueVisitorApp.java
@@ -0,0 +1,185 @@
+package com.atguigu.gmall.realtime.app.dwm;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.text.SimpleDateFormat;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
+ *@Author: markilue
+ *@CreateTime: 2023-05-08 16:56
+ *@Description:
+ * TODO 独立访客计算
+ * 需要启动的进程:
+ * zk,kafka,logger,hadoop,模拟日志jar,UniqueVisitorApp,BaseLogApp
+ * 执行流程:
+ * 1.运行模拟生成日志的jar
+ * 2.将人们ONI生成日志数据发送给nginx进行负载均衡
+ * 3.nginx将请求转发给三台日志采集服务
+ * 4.三台日志采集服务器接收到日志数据 将日志数据发送到kafka的ods_base_log主题中
+ * 5.BaseLogApp应用程序从ods_base_log中读取数据,进行分流
+ * >启动日志 --dwd_start_log
+ * >曝光日志 --dwd_display_log
+ * >页面日志 --dwd_page_log
+ * 6.uniqueVisitorApp从dwd_page_log主题读取数据
+ * >对pv进行过滤
+ * >按照mid进行分组
+ * >使用filter算子对数据进行过滤
+ * >在过滤的时候,使用状态变量记录上次访问日期
+ * >
+ *
+ *@Version: 1.0
+ */
+public class UniqueVisitorApp {
+
+
+ public static void main(String[] args) throws Exception {
+
+ //TODO 1.基础环境状态
+
+ //1.1 流处理环境
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ //1.2 设置并行度
+ env.setParallelism(1);
+
+ //TODO 2.检查点设置
+ //2.1 开启检查点
+ env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
+
+ //2.2 设置检查点超时时间
+ env.getCheckpointConfig().setCheckpointTimeout(60000);
+ //2.3 设置重启策略
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
+ //2.4 设置job取消后,检查点是否保存
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ //2.5 设置状态后端 内存/文件系统/RocksDB
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
+ //2.6 指定操作HDFS用户
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+ //TODO 3.从kafka中读取数据
+
+ //3.1 声明消费主题以及消费者组
+ String topic = "dwd_page_log";
+ String groupId = "unique_visitor_app_group";
+
+ //3.2 获取kafka消费者对象
+ FlinkKafkaConsumer kafkaSource = MyKafkaUtils.getKafkaSource(topic, groupId);
+
+ //3.3 读取数据封装流
+ DataStreamSource kafkaDS = env.addSource(kafkaSource);
+
+// kafkaDS.print(">>>");
+
+ //TODO 4.对读取的数据进行类型转换 String ->JSONObject
+ SingleOutputStreamOperator jsonDS = kafkaDS.map(JSON::parseObject);
+
+// jsonDS.print(">>>>");
+
+ //TODO 5.按照设备id进行分层
+ /*
+ {
+ "common": {
+ "ar": "420000",
+ "uid": "49",
+ "os": "iOS 13.2.3",
+ "ch": "Appstore",
+ "is_new": "1",
+ "md": "iPhone Xs",
+ "mid": "mid_13",
+ "vc": "v2.1.134",
+ "ba": "iPhone"
+ },
+ "page": {
+ "page_id": "payment",
+ "item": "1,8",
+ "during_time": 1329,
+ "item_type": "sku_ids",
+ "last_page_id": "trade"
+ },
+ "ts": 1683278828000
+ }
+ */
+ KeyedStream keyedDS = jsonDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
+
+ //TODO 6.过滤实现
+ SingleOutputStreamOperator filterDs = keyedDS.filter(
+ new RichFilterFunction() {
+
+ //声明状态变量,用于存放上次访问日期
+ private ValueState lastVisitDateState;
+ //转化日期格式工具类
+ private SimpleDateFormat sdf;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ sdf = new SimpleDateFormat("yyyyMMdd");
+ //注意:UV其实可以延伸为计算日活,如果是日活,则状态值主要用于筛选当天是否访问过,所以状态过了今天基本上就没有存在的意义了
+ //所以这类设置状态的失效时间为1天
+ ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("lastVisitDate", String.class);
+
+ StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1))
+ .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//在创建和写入的时候改变(就是默认值)
+ .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//如果过期,如果还没有被垃圾回收,是否返回给状态调用者(这也是默认值)
+ .build();//构造者设计模式
+ valueStateDescriptor.enableTimeToLive(ttlConfig);
+ lastVisitDateState = getRuntimeContext().getState(valueStateDescriptor);
+
+ }
+
+ @Override
+ public boolean filter(JSONObject jsonObject) throws Exception {
+
+ //如果是从其他页面跳转的,那么直接过滤掉
+ String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
+ if (lastPageId != null && lastPageId.length() > 0) {
+ return false;
+ }
+ //获取状态中上次访问日志
+ String lastPageDate = lastVisitDateState.value();
+ String curVisitDate = sdf.format(jsonObject.getLong("ts"));
+ if (lastPageDate != null && lastPageDate.length() > 0 && lastPageDate.equals(curVisitDate)) {
+ //已经访问过
+ return false;
+ } else {
+ //还没有访问过
+ lastVisitDateState.update(curVisitDate);
+ return true;
+ }
+ }
+ }
+ );
+
+ filterDs.print(">>>>");
+
+ //TODO 7.将过滤后的UV数据 写回到kafka的dwm层
+
+ filterDs.map(jsonObject -> jsonObject.toJSONString()).addSink(
+ MyKafkaUtils.getKafkaSink("dwm_unique_visitor")
+ );//需要转为String之后addSink才行
+
+
+ env.execute();
+
+
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java
new file mode 100644
index 0000000..4544f47
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java
@@ -0,0 +1,163 @@
+package com.atguigu.gmall.realtime.app.dwm;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cep.CEP;
+import org.apache.flink.cep.PatternStream;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
+ *@Author: markilue
+ *@CreateTime: 2023-05-08 20:17
+ *@Description: TODO 用户跳出明细统计
+ *@Version: 1.0
+ */
+public class UserJumpDetailApp {
+
+ public static void main(String[] args) throws Exception {
+ //TODO 1.创建流环境
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+
+ //TODO 2.检查点设置
+ env.enableCheckpointing(5000L);
+ env.getCheckpointConfig().setCheckpointTimeout(60000);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+
+ //TODO 3.从kafka中读取数据
+ String topic = "dwd_page_log";
+ String groupId = "user_jump_detail_app_group";
+ DataStreamSource kafkaDS = env.addSource(MyKafkaUtils.getKafkaSource(topic, groupId));
+
+ //TODO 4.map String ->JSONObject
+ SingleOutputStreamOperator jsonDS = kafkaDS.map(JSON::parseObject);
+// jsonDS.print(">>>");
+
+
+ //TODO 5.指定watermark以及提取事件时间字段
+ SingleOutputStreamOperator jsonObjWithWatermarkDS = jsonDS.assignTimestampsAndWatermarks(
+ WatermarkStrategy.forMonotonousTimestamps()
+ .withTimestampAssigner(
+ new SerializableTimestampAssigner() {
+ @Override
+ public long extractTimestamp(JSONObject jsonObject, long l) {
+ return jsonObject.getLong("ts");
+ }
+ }
+ )
+ );
+
+ //TODO 6.按照mid分组
+ KeyedStream keyedDS = jsonObjWithWatermarkDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
+
+
+ /*
+ {
+ "common": {
+ "ar": "370000",
+ "uid": "44",
+ "os": "iOS 13.3.1",
+ "ch": "Appstore",
+ "is_new": "1",
+ "md": "iPhone X",
+ "mid": "mid_13",
+ "vc": "v2.1.132",
+ "ba": "iPhone"
+ },
+ "page": {
+ "page_id": "cart",
+ "during_time": 14540,
+ "last_page_id": "good_detail"
+ },
+ "ts": 1683289583000
+ }
+ */
+ //TODO 7.定义pattern
+ Pattern pattern = Pattern.begin("first").where(
+ new SimpleCondition() {
+ @Override
+ public boolean filter(JSONObject jsonObject) {
+ //条件1:上次跳转为空
+ String lastPage = jsonObject.getJSONObject("page").getString("last_page_id");
+ return lastPage == null || lastPage.length() == 0;
+ }
+ }
+ ).next("second").where(
+ new SimpleCondition() {
+ //访问了网站其他的页面
+ @Override
+ public boolean filter(JSONObject jsonObject) {
+ //事实上:只要是第二个时间来了,进入这个方法,就证明他第二次访问了,可以直接返回为true
+ String pageId = jsonObject.getJSONObject("page").getString("page_id");
+ return pageId != null && pageId.length() > 0;
+ }
+ }
+ ).within(Time.seconds(10));//10秒钟;超时的数据或者没有下一条的数据,会放入侧输出流中
+
+ //TODO 8.将pattern应用到流上
+ PatternStream patternDS = CEP.pattern(keyedDS, pattern);
+
+ //TODO 9.从流中提取数据
+
+ OutputTag outputTag = new OutputTag("lateData") {};
+ patternDS.process(
+ new MyPatternProcessFunction(outputTag)
+ );
+
+
+ env.execute();
+
+ }
+}
+
+
+class MyPatternProcessFunction extends PatternProcessFunction implements TimedOutPartialMatchHandler {
+
+ private OutputTag outputTag;
+
+ public MyPatternProcessFunction(OutputTag outputTag) {
+ this.outputTag = outputTag;
+ }
+
+ //处理匹配上的数据
+ @Override
+ public void processMatch(Map> match, Context ctx, Collector out) throws Exception {
+ //map中存放<满足要求的name,list(满足要求的第一个事件)>
+
+ }
+
+ @Override
+ public void processTimedOutMatch(Map> match, Context ctx) throws Exception {
+ JSONObject startEvent = match.get("start").get(0);
+ ctx.output(outputTag, startEvent.toJSONString());
+
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java
new file mode 100644
index 0000000..5397208
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java
@@ -0,0 +1,73 @@
+package com.atguigu.gmall.realtime.app.func;
+
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.common.GmallConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.sql.Connection;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.func
+ *@Author: markilue
+ *@CreateTime: 2023-05-08 14:45
+ *@Description: TODO 将维度侧输出流的数据写到Hbase(phoenix)中
+ *@Version: 1.0
+ */
+public class DimSink extends RichSinkFunction {
+
+ //声明连接对象
+ private Connection connection;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+ connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
+ }
+
+
+ @Override
+ public void invoke(JSONObject jsonObject, Context context) {
+ //{"database":"rt_gmall","xid":17654,"data":{"tm_name":"dadaaa","id":13},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1683527538}
+ //获取维度表表名
+ String tableName = jsonObject.getString("sink_table");
+ JSONObject data = jsonObject.getJSONObject("data");
+
+ //拼接插入语句 upsert into 表空间.表名 values(xx,xx,xx)
+ String upsertSQL = generateSQL(tableName, data);
+ System.out.println("向phoenix维度表中插入数据:" + upsertSQL);
+ //创建数据库对象
+ try (PreparedStatement ps = connection.prepareStatement(upsertSQL)) {
+
+ ps.executeUpdate();
+ //TODO 注意:提交事务(MQSQL默认自动提交事务,Phoenix默认是手动提交事务)
+ connection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ throw new RuntimeException("向phoenix维度表中插入数据失败");
+ }
+
+ }
+
+ private String generateSQL(String tableName, JSONObject data) {
+
+ String key = StringUtils.join(data.keySet(), ",");
+ String value = StringUtils.join(data.values(), "','");
+
+ String sql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + tableName
+ + "(" + key + ")" +
+ " values('" + value + "')";
+ //upsert into GMALL_REALTIME.dim_base_trademark(tm_name,id) values('asdas','12')
+
+
+ return sql;
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java
index f961671..fefc41d 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/TableProcessFunction.java
@@ -3,13 +3,21 @@ package com.atguigu.gmall.realtime.app.func;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.beans.TableProcess;
+import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.*;
+
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.func
@@ -23,11 +31,22 @@ public class TableProcessFunction extends BroadcastProcessFunction dimTag;
private MapStateDescriptor mapStateDescriptor;
+ //声明连接对象
+ private Connection connection;
+
public TableProcessFunction(OutputTag dimTag, MapStateDescriptor mapStateDescriptor) {
this.dimTag = dimTag;
this.mapStateDescriptor = mapStateDescriptor;
}
+ //创建时只执行一次,非常适合注册驱动和创建连接的操作
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+ connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
+
+ }
+
//处理业务流中数据 maxwell从业务数据库中采集到的数据
//理论上来说,维度数据一定要比事实数据先到,因为如果没有用户没有订单等信息,就不会有事实
//这件事情,可以人为的控制一下,比如先开flinkCDC后开Maxwell
@@ -56,6 +75,13 @@ public class TableProcessFunction extends BroadcastProcessFunction columnSet = new HashSet<>(Arrays.asList(columns));
+
+ Set> entries = data.entrySet();
+ entries.removeIf(entry -> !columnSet.contains(entry.getKey()));
+ }
+
//处理广播流中的数据 flinkCDC从Mysql中读取配置信息
//s: {"database":"rt_gmall_realtime","data":{"name":"ssss","id":1},"type":"insert","table":"t_user"}
@Override
@@ -97,7 +134,7 @@ public class TableProcessFunction extends BroadcastProcessFunction getKafkaSource(String topic,String groupId) {
+ public static FlinkKafkaConsumer getKafkaSource(String topic, String groupId) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -25,13 +30,40 @@ public class MyKafkaUtils {
}
//获取kafka的生产者
+// public static FlinkKafkaProducer getKafkaSink(String topic) {
+// Properties properties = new Properties();
+// properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
+//// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+//
+//
+
+// return new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);//从构造方法来看只能保证至少一次
+// }
+
public static FlinkKafkaProducer getKafkaSink(String topic) {
Properties properties = new Properties();
- properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
+ properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- return new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);
+// return new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);//从构造方法来看只能保证至少一次
+ return new FlinkKafkaProducer(topic, new KafkaSerializationSchema() {
+ @Override
+ public ProducerRecord serialize(String s, @Nullable Long aLong) {
+ return new ProducerRecord(topic, s.toString().getBytes());
+ }
+ }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);//从构造方法来看只能保证至少一次
+ }
+
+ public static FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema kafkaSerializationSchema) {
+ Properties properties = new Properties();
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
+ properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时
+// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+
+ return new FlinkKafkaProducer(DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java
index 97226db..7778453 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java
@@ -15,9 +15,10 @@ import java.sql.SQLException;
*/
public class MyPhoenixUtils {
- private static final String URL ="jdbc:phoenix:thin:url=http://Ding202:8765;serialization=PROTOBUF";
+ private static final String URL ="jdbc:phoenix:Ding202,Ding203,Ding204:2181";
- public void executeSQL(String sql) throws SQLException {
+ public void executeSQL(String sql) throws Exception {
+ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
Connection connection = DriverManager.getConnection(URL);
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.execute();
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/hbase-site.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..1b4ce1a
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/hbase-site.xml
@@ -0,0 +1,40 @@
+
+
+
+
+
+ hbase.rootdir
+ hdfs://Ding202:8020/hbase
+
+
+
+ hbase.cluster.distributed
+ true
+
+
+
+ hbase.zookeeper.quorum
+ Ding202,Ding203,Ding204
+
+
+
+ hbase.unsafe.stream.capability.enforce
+ false
+
+
+
+ hbase.wal.provider
+ filesystem
+
+
+
+ phoenix.schema.isNamespaceMappingEnabled
+ true
+
+
+
+ phoenix.schema.mapSystemTablesToNamespace
+ true
+
+
+
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties
index c706282..963ca91 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/resources/log4j.properties
@@ -1,4 +1,4 @@
-log4j.rootLogger=warn,stdout
+log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout