实时数仓任务2更新

This commit is contained in:
markilue 2023-05-08 21:19:45 +08:00
parent d0a8cfc1cf
commit 357262888d
13 changed files with 646 additions and 20 deletions

View File

@ -114,6 +114,26 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils我们可以很方便的对bean对象的属性进行操作-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.atguigu.gmall.realtime.app.func.DimSink;
import com.atguigu.gmall.realtime.app.func.MyDeserializationSchemaFunction; import com.atguigu.gmall.realtime.app.func.MyDeserializationSchemaFunction;
import com.atguigu.gmall.realtime.app.func.TableProcessFunction; import com.atguigu.gmall.realtime.app.func.TableProcessFunction;
import com.atguigu.gmall.realtime.beans.TableProcess; import com.atguigu.gmall.realtime.beans.TableProcess;
@ -19,10 +20,15 @@ import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag; import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
/** /**
*@BelongsProject: rt-gmall-parent *@BelongsProject: rt-gmall-parent
@ -117,9 +123,21 @@ public class BaseDBApp {
realDS.print("####"); realDS.print("####");
//TODO 8.将维度侧输出流的数据写到Hbase中 //TODO 8.将维度侧输出流的数据写到Hbase(phoenix)
dimDS.addSink(new DimSink());
//TODO 9.将主流数据写回到Kafka的dwd层中 //TODO 9.将主流数据写回到Kafka的dwd层中
//因为不同的表需要发送到不同的主题当中去,所以就不用采用传统的方式
//但是为了保证精确一致性,尽可能的使用Flink自己提供的方式,因此采用了以下的方式
realDS.addSink(MyKafkaUtils.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
String topic = jsonObject.getString("sink_table");
return new ProducerRecord<byte[], byte[]>(topic, jsonObject.getJSONObject("data").toJSONString().getBytes());
}
}));
env.execute(); env.execute();

View File

@ -213,9 +213,9 @@ public class BaseLogApp {
//6.3 获取不同流数据 输出测试 //6.3 获取不同流数据 输出测试
DataStream<String> startDs = pageDs.getSideOutput(startTag); DataStream<String> startDs = pageDs.getSideOutput(startTag);
DataStream<String> displayDS = pageDs.getSideOutput(displayTag); DataStream<String> displayDS = pageDs.getSideOutput(displayTag);
// pageDs.print(">>>>"); pageDs.print(">>>>");
// startDs.print("####"); startDs.print("####");
// displayDS.print("&&&&"); displayDS.print("&&&&");
//TODO 7.将不同流的数据写到kafka的dwd不同的主题中 //TODO 7.将不同流的数据写到kafka的dwd不同的主题中

View File

@ -0,0 +1,185 @@
package com.atguigu.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.SimpleDateFormat;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
*@Author: markilue
*@CreateTime: 2023-05-08 16:56
*@Description:
* TODO 独立访客计算
* 需要启动的进程:
* zk,kafka,logger,hadoop,模拟日志jar,UniqueVisitorApp,BaseLogApp
* 执行流程:
* 1.运行模拟生成日志的jar
* 2.将人们ONI生成日志数据发送给nginx进行负载均衡
* 3.nginx将请求转发给三台日志采集服务
* 4.三台日志采集服务器接收到日志数据 将日志数据发送到kafka的ods_base_log主题中
* 5.BaseLogApp应用程序从ods_base_log中读取数据,进行分流
* >启动日志 --dwd_start_log
* >曝光日志 --dwd_display_log
* >页面日志 --dwd_page_log
* 6.uniqueVisitorApp从dwd_page_log主题读取数据
* >对pv进行过滤
* >按照mid进行分组
* >使用filter算子对数据进行过滤
* >在过滤的时候使用状态变量记录上次访问日期
* >
*
*@Version: 1.0
*/
public class UniqueVisitorApp {
public static void main(String[] args) throws Exception {
//TODO 1.基础环境状态
//1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度
env.setParallelism(1);
//TODO 2.检查点设置
//2.1 开启检查点
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//2.2 设置检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
//2.3 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
//2.4 设置job取消后,检查点是否保存
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.5 设置状态后端 内存/文件系统/RocksDB
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
//2.6 指定操作HDFS用户
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//TODO 3.从kafka中读取数据
//3.1 声明消费主题以及消费者组
String topic = "dwd_page_log";
String groupId = "unique_visitor_app_group";
//3.2 获取kafka消费者对象
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtils.getKafkaSource(topic, groupId);
//3.3 读取数据封装流
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
// kafkaDS.print(">>>");
//TODO 4.对读取的数据进行类型转换 String ->JSONObject
SingleOutputStreamOperator<JSONObject> jsonDS = kafkaDS.map(JSON::parseObject);
// jsonDS.print(">>>>");
//TODO 5.按照设备id进行分层
/*
{
"common": {
"ar": "420000",
"uid": "49",
"os": "iOS 13.2.3",
"ch": "Appstore",
"is_new": "1",
"md": "iPhone Xs",
"mid": "mid_13",
"vc": "v2.1.134",
"ba": "iPhone"
},
"page": {
"page_id": "payment",
"item": "1,8",
"during_time": 1329,
"item_type": "sku_ids",
"last_page_id": "trade"
},
"ts": 1683278828000
}
*/
KeyedStream<JSONObject, String> keyedDS = jsonDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
//TODO 6.过滤实现
SingleOutputStreamOperator<JSONObject> filterDs = keyedDS.filter(
new RichFilterFunction<JSONObject>() {
//声明状态变量,用于存放上次访问日期
private ValueState<String> lastVisitDateState;
//转化日期格式工具类
private SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyyMMdd");
//注意:UV其实可以延伸为计算日活如果是日活则状态值主要用于筛选当天是否访问过,所以状态过了今天基本上就没有存在的意义了
//所以这类设置状态的失效时间为1天
ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("lastVisitDate", String.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//在创建和写入的时候改变(就是默认值)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//如果过期如果还没有被垃圾回收是否返回给状态调用者(这也是默认值)
.build();//构造者设计模式
valueStateDescriptor.enableTimeToLive(ttlConfig);
lastVisitDateState = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
//如果是从其他页面跳转的,那么直接过滤掉
String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
if (lastPageId != null && lastPageId.length() > 0) {
return false;
}
//获取状态中上次访问日志
String lastPageDate = lastVisitDateState.value();
String curVisitDate = sdf.format(jsonObject.getLong("ts"));
if (lastPageDate != null && lastPageDate.length() > 0 && lastPageDate.equals(curVisitDate)) {
//已经访问过
return false;
} else {
//还没有访问过
lastVisitDateState.update(curVisitDate);
return true;
}
}
}
);
filterDs.print(">>>>");
//TODO 7.将过滤后的UV数据 写回到kafka的dwm层
filterDs.map(jsonObject -> jsonObject.toJSONString()).addSink(
MyKafkaUtils.getKafkaSink("dwm_unique_visitor")
);//需要转为String之后addSink才行
env.execute();
}
}

View File

@ -0,0 +1,163 @@
package com.atguigu.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
*@Author: markilue
*@CreateTime: 2023-05-08 20:17
*@Description: TODO 用户跳出明细统计
*@Version: 1.0
*/
public class UserJumpDetailApp {
public static void main(String[] args) throws Exception {
//TODO 1.创建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//TODO 2.检查点设置
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//TODO 3.从kafka中读取数据
String topic = "dwd_page_log";
String groupId = "user_jump_detail_app_group";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtils.getKafkaSource(topic, groupId));
//TODO 4.map String ->JSONObject
SingleOutputStreamOperator<JSONObject> jsonDS = kafkaDS.map(JSON::parseObject);
// jsonDS.print(">>>");
//TODO 5.指定watermark以及提取事件时间字段
SingleOutputStreamOperator<JSONObject> jsonObjWithWatermarkDS = jsonDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("ts");
}
}
)
);
//TODO 6.按照mid分组
KeyedStream<JSONObject, String> keyedDS = jsonObjWithWatermarkDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
/*
{
"common": {
"ar": "370000",
"uid": "44",
"os": "iOS 13.3.1",
"ch": "Appstore",
"is_new": "1",
"md": "iPhone X",
"mid": "mid_13",
"vc": "v2.1.132",
"ba": "iPhone"
},
"page": {
"page_id": "cart",
"during_time": 14540,
"last_page_id": "good_detail"
},
"ts": 1683289583000
}
*/
//TODO 7.定义pattern
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) {
//条件1:上次跳转为空
String lastPage = jsonObject.getJSONObject("page").getString("last_page_id");
return lastPage == null || lastPage.length() == 0;
}
}
).next("second").where(
new SimpleCondition<JSONObject>() {
//访问了网站其他的页面
@Override
public boolean filter(JSONObject jsonObject) {
//事实上:只要是第二个时间来了进入这个方法就证明他第二次访问了可以直接返回为true
String pageId = jsonObject.getJSONObject("page").getString("page_id");
return pageId != null && pageId.length() > 0;
}
}
).within(Time.seconds(10));//10秒钟;超时的数据或者没有下一条的数据,会放入侧输出流中
//TODO 8.将pattern应用到流上
PatternStream<JSONObject> patternDS = CEP.pattern(keyedDS, pattern);
//TODO 9.从流中提取数据
OutputTag<String> outputTag = new OutputTag<String>("lateData") {};
patternDS.process(
new MyPatternProcessFunction(outputTag)
);
env.execute();
}
}
class MyPatternProcessFunction extends PatternProcessFunction<JSONObject, JSONObject> implements TimedOutPartialMatchHandler<JSONObject> {
private OutputTag<String> outputTag;
public MyPatternProcessFunction(OutputTag outputTag) {
this.outputTag = outputTag;
}
//处理匹配上的数据
@Override
public void processMatch(Map<String, List<JSONObject>> match, Context ctx, Collector<JSONObject> out) throws Exception {
//map中存放<满足要求的name,list(满足要求的第一个事件)>
}
@Override
public void processTimedOutMatch(Map<String, List<JSONObject>> match, Context ctx) throws Exception {
JSONObject startEvent = match.get("start").get(0);
ctx.output(outputTag, startEvent.toJSONString());
}
}

View File

@ -0,0 +1,73 @@
package com.atguigu.gmall.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-08 14:45
*@Description: TODO 将维度侧输出流的数据写到Hbase(phoenix)
*@Version: 1.0
*/
public class DimSink extends RichSinkFunction<JSONObject> {
//声明连接对象
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
@Override
public void invoke(JSONObject jsonObject, Context context) {
//{"database":"rt_gmall","xid":17654,"data":{"tm_name":"dadaaa","id":13},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1683527538}
//获取维度表表名
String tableName = jsonObject.getString("sink_table");
JSONObject data = jsonObject.getJSONObject("data");
//拼接插入语句 upsert into 表空间.表名 values(xx,xx,xx)
String upsertSQL = generateSQL(tableName, data);
System.out.println("向phoenix维度表中插入数据:" + upsertSQL);
//创建数据库对象
try (PreparedStatement ps = connection.prepareStatement(upsertSQL)) {
ps.executeUpdate();
//TODO 注意:提交事务(MQSQL默认自动提交事务,Phoenix默认是手动提交事务)
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("向phoenix维度表中插入数据失败");
}
}
private String generateSQL(String tableName, JSONObject data) {
String key = StringUtils.join(data.keySet(), ",");
String value = StringUtils.join(data.values(), "','");
String sql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + tableName
+ "(" + key + ")" +
" values('" + value + "')";
//upsert into GMALL_REALTIME.dim_base_trademark(tm_name,id) values('asdas','12')
return sql;
}
}

View File

@ -3,13 +3,21 @@ package com.atguigu.gmall.realtime.app.func;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.beans.TableProcess; import com.atguigu.gmall.realtime.beans.TableProcess;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag; import org.apache.flink.util.OutputTag;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
/** /**
*@BelongsProject: rt-gmall-parent *@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.func *@BelongsPackage: com.atguigu.gmall.realtime.app.func
@ -23,11 +31,22 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
private OutputTag<JSONObject> dimTag; private OutputTag<JSONObject> dimTag;
private MapStateDescriptor<String, TableProcess> mapStateDescriptor; private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
//声明连接对象
private Connection connection;
public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) { public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.dimTag = dimTag; this.dimTag = dimTag;
this.mapStateDescriptor = mapStateDescriptor; this.mapStateDescriptor = mapStateDescriptor;
} }
//创建时只执行一次,非常适合注册驱动和创建连接的操作
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
//处理业务流中数据 maxwell从业务数据库中采集到的数据 //处理业务流中数据 maxwell从业务数据库中采集到的数据
//理论上来说,维度数据一定要比事实数据先到因为如果没有用户没有订单等信息就不会有事实 //理论上来说,维度数据一定要比事实数据先到因为如果没有用户没有订单等信息就不会有事实
//这件事情可以人为的控制一下比如先开flinkCDC后开Maxwell //这件事情可以人为的控制一下比如先开flinkCDC后开Maxwell
@ -56,6 +75,13 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
//判断是事实数据还是维度数据 //判断是事实数据还是维度数据
String sinkTable = tableProcess.getSinkTable(); String sinkTable = tableProcess.getSinkTable();
jsonObject.put("sink_table", sinkTable); jsonObject.put("sink_table", sinkTable);
//向下游传递数据之前,将不许要的字段过滤掉
//过滤思路:从配置表中读取保留子弹根据保留字段对data中的属性进行过滤
JSONObject data = jsonObject.getJSONObject("data");
String sinkColumns = tableProcess.getSinkColumns();
filterColumns(data, sinkColumns);
String sinkType = tableProcess.getSinkType(); String sinkType = tableProcess.getSinkType();
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)) { if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)) {
//是维度数据 放到维度侧输出流汇总 //是维度数据 放到维度侧输出流汇总
@ -71,6 +97,17 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
} }
//过滤字段
private void filterColumns(JSONObject data, String sinkColumns) {
//data : {"tm_name":"ASDAD","logo_url":"FDSFS","id":17}
//sinkColumns : id,tm_name
String[] columns = sinkColumns.split(",");
HashSet<String> columnSet = new HashSet<>(Arrays.asList(columns));
Set<Map.Entry<String, Object>> entries = data.entrySet();
entries.removeIf(entry -> !columnSet.contains(entry.getKey()));
}
//处理广播流中的数据 flinkCDC从Mysql中读取配置信息 //处理广播流中的数据 flinkCDC从Mysql中读取配置信息
//s: {"database":"rt_gmall_realtime","data":{"name":"ssss","id":1},"type":"insert","table":"t_user"} //s: {"database":"rt_gmall_realtime","data":{"name":"ssss","id":1},"type":"insert","table":"t_user"}
@Override @Override
@ -97,7 +134,7 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
//如果说 读取到的配置信息是维度数据的话那么提前在Hbase中创建维度表 //如果说 读取到的配置信息是维度数据的话那么提前在Hbase中创建维度表
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)) { if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)) {
//如果是维度数据且是insert,update就没必要在创建 //如果是维度数据且是insert;update就没必要在创建
checkTable(sinkTable, sinkPk, sinkColumns, sinkExtend); checkTable(sinkTable, sinkPk, sinkColumns, sinkExtend);
} }
@ -109,16 +146,54 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
} }
//处理配置数据的时候 提前建立维度表 create table if not exist 表空间.表名(字段名 数据类型) //处理配置数据的时候 提前建立维度表 create table if not exist 表空间.表名(字段名 数据类型)
private void checkTable(String sinkTable, String sinkPk, String sinkColumns, String sinkExtend) { private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {
StringBuilder sql = new StringBuilder(); //对主键进行空值处理
sql.append("create table if not exist " + sinkTable + "("); if (pk == null) {
String[] columns = sinkColumns.split(","); pk = "id";
}
for (String column : columns) { //对建表扩展进行空值处理
sql.append(column + " String,"); if (ext == null) {
ext = "";
}
StringBuilder sql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + tableName + "(");
String[] columns = fields.split(",");
for (int i = 0; i < columns.length; i++) {
String column = columns[i];
//判断是否为主键
if (column.equals(pk)) {
sql.append(column + " varchar primary key");
} else {
sql.append(column + " varchar");
}
//判断是否加,
if (i < columns.length - 1) {
sql.append(",");
}
}
sql.append(") " + ext);
System.out.println("建表语句:" + sql);
PreparedStatement ps = null;
try {
//创建数据库操作对象
ps = connection.prepareStatement(sql.toString());
//执行sql
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("phoenix建表失败");
} finally {
//释放资源
if (ps != null) {
ps.close();
}
} }
sql.append("PRIMARY KEY (" + sinkPk + "))");
} }
} }

View File

@ -0,0 +1,15 @@
package com.atguigu.gmall.realtime.common;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.common
*@Author: markilue
*@CreateTime: 2023-05-08 13:31
*@Description: TODO 实时数仓项目常量类
*@Version: 1.0
*/
public class GmallConfig {
public static final String HBASE_SCHEMA = "GMALL_REALTIME";//Hbase库名
public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接
}

View File

@ -4,8 +4,12 @@ package com.atguigu.gmall.realtime.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties; import java.util.Properties;
/** /**
@ -13,6 +17,7 @@ import java.util.Properties;
*/ */
public class MyKafkaUtils { public class MyKafkaUtils {
private static final String KAFKA_SERVER = "Ding202:9092,Ding203:9092,Ding204:9092"; private static final String KAFKA_SERVER = "Ding202:9092,Ding203:9092,Ding204:9092";
private static final String DEFAULT_TOPIC = "dwd_default_topic";
//获取kafka的消费者 //获取kafka的消费者
public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) { public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
@ -25,13 +30,40 @@ public class MyKafkaUtils {
} }
//获取kafka的生产者 //获取kafka的生产者
// public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
// Properties properties = new Properties();
// properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
//// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//
//
// return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);//从构造方法来看只能保证至少一次
// }
public static FlinkKafkaProducer<String> getKafkaSink(String topic) { public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); // properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties); // return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);//从构造方法来看只能保证至少一次
return new FlinkKafkaProducer<String>(topic, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return new ProducerRecord<byte[], byte[]>(topic, s.toString().getBytes());
}
}, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);//从构造方法来看只能保证至少一次
}
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
} }

View File

@ -15,9 +15,10 @@ import java.sql.SQLException;
*/ */
public class MyPhoenixUtils { public class MyPhoenixUtils {
private static final String URL ="jdbc:phoenix:thin:url=http://Ding202:8765;serialization=PROTOBUF"; private static final String URL ="jdbc:phoenix:Ding202,Ding203,Ding204:2181";
public void executeSQL(String sql) throws SQLException { public void executeSQL(String sql) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
Connection connection = DriverManager.getConnection(URL); Connection connection = DriverManager.getConnection(URL);
PreparedStatement preparedStatement = connection.prepareStatement(sql); PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.execute(); preparedStatement.execute();

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://Ding202:8020/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>Ding202,Ding203,Ding204</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.wal.provider</name>
<value>filesystem</value>
</property>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,4 +1,4 @@
log4j.rootLogger=warn,stdout log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout=org.apache.log4j.PatternLayout