实时数仓任务2更新

This commit is contained in:
markilue 2023-05-06 09:59:02 +08:00
parent 05003d6c49
commit 1f12cec9f0
4 changed files with 474 additions and 17 deletions

View File

@ -0,0 +1,82 @@
package com.atguigu.gmall.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.dwd
*@Author: markilue
*@CreateTime: 2023-05-05 21:13
*@Description: TODO 业务数据动态分流
*@Version: 1.0
*/
public class BaseDBApp {
public static void main(String[] args) throws Exception {
//TODO 1.基本环境准确
//1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度
env.setParallelism(4);
//TODO 2.检查点设置
//2.1 开启检查点
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//2.2 设置检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000L);
//2.3 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
//2.4 设置job取消后,检查点是否保存
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.5 设置状态后端 内存/文件系统/RocksDB
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
//2.6 指定操作HDFS用户
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//TODO 3.从kafka中读取数据
//3.1 声明消费主题以及消费者组
String topic = "ods_base_db_m";
String groupId = "base_db_app_group";
//3.2 获取消费者对象
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtils.getKafkaSource(topic, groupId);
//3.3 读取数据 封装流
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
//TODO 4.对数据类型进行转换 String ->JSONObject
SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(JSON::parseObject);
//TODO 5.简单ETL
SingleOutputStreamOperator<JSONObject> filterDs = jsonObjDs.filter(
jsonObject -> {
boolean flag = jsonObject.getString("table") != null
&& jsonObject.getString("table").length() > 0
&& jsonObject.getJSONObject("data") != null
&& jsonObject.getString("data").length() > 3;
return flag;
}
);
filterDs.print(">>>>");
//TODO 6.动态分流 ****
//TODO 7.将维度侧输出流的数据写到Hbase中
//TODO 8.将主流数据写回到Kafka的dwd层中
env.execute();
}
}

View File

@ -1,20 +1,33 @@
package com.atguigu.gmall.realtime.app.dwd; package com.atguigu.gmall.realtime.app.dwd;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils; import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.text.SimpleDateFormat;
import java.util.Properties; import java.util.Properties;
/** /**
@ -24,6 +37,19 @@ import java.util.Properties;
* 曝光日志放到曝光侧输出流中 * 曝光日志放到曝光侧输出流中
* 页面日志放到页面侧输出流中 * 页面日志放到页面侧输出流中
* 将不同流的数据协会kafka的dwd主题中 * 将不同流的数据协会kafka的dwd主题中
*
* 日志数据分流执行流程:
* -需要启动的进程:
* zk,kafka,[hdfs],logger,BaseLogApp
* -运行模拟生成日志jar包
* -将生成的日志发送给Nginx
* -Nginx接受到数据之后进行请求转发将请求发送给Ding202,203,204上的日志采集服务
* -日志采集服务对数据进行输出落盘以及发送到kafka的ods_base_log
* -BaseLogApp从ods_base_log读取数据
* >结构转换 String ->JSONObject
* >状态修复 分组,修复
* >分流
* >将分流后的数据写到kafka的dwd层不同的主题中
*/ */
public class BaseLogApp { public class BaseLogApp {
@ -37,20 +63,19 @@ public class BaseLogApp {
env.setParallelism(4); //这里需要与kafka对应的分区相对应 env.setParallelism(4); //这里需要与kafka对应的分区相对应
//TODO 2.检查点相关设置 //TODO 2.检查点相关设置
//2.1 开启检查点 //2.1 开启检查点
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//2.1 设置检查点超时时间 //2.2 设置检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setCheckpointTimeout(60000);
//2.1 设置重启策略 这里是固定次数的重启 ->3秒钟重启一次一共最多重启3次 //2.3 设置重启策略 这里是固定次数的重启 ->3秒钟重启一次一共最多重启3次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
//2.1 设置job取消后检查点是否保留 这里设置保留检查点-> //2.4 设置job取消后检查点是否保留 这里设置保留检查点->
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.1 设置状态后端 即设置检查点的存储位置 内存|文件系统|RocksDB RocksDB类似于Redis会在内存中存一份也会进行落盘 //2.5 设置状态后端 即设置检查点的存储位置 内存|文件系统|RocksDB RocksDB类似于Redis会在内存中存一份也会进行落盘
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
//2 .6 指定操作HDFS的用户 //2.6 指定操作HDFS的用户
System.setProperty("HADOOP_USER_NAME","dingjiawen"); System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//TODO 3.从kafka中读取数据 //TODO 3.从kafka中读取数据
//3.1 声明消费的主题 //3.1 声明消费的主题
@ -79,32 +104,140 @@ public class BaseLogApp {
//方法的默认实现 //方法的默认实现
// kafkaDS.map(JSON::parseObject); // kafkaDS.map(JSON::parseObject);
jsonObjectDs.print(">>>"); // jsonObjectDs.print(">>>");
//TODO 5.新老访客状态进行修复 --flink状态编程:键控状态
//5.1 按照用户id进行分组 --jsonObject的格式具体参照log.json
KeyedStream<JSONObject, String> keyedDS = jsonObjectDs.keyBy(
jsonObject -> jsonObject.getJSONObject("common").getString("mid")
);
//5.2 修复 --仍然是之前的记录只是改变他的is_new属性因此用map
SingleOutputStreamOperator<JSONObject> jsonWithNewDs = keyedDS.map(
new RichMapFunction<JSONObject, JSONObject>() {
//注意:不能再生命的时候对状态进行初始化,因为这个时候还不能获取到getRuntimeContext
private ValueState<String> lastVisitDateState;
private SimpleDateFormat sdf;//如果在这里初始化那么每调用一次map方法就会创建一个对象
@Override
public void open(Configuration parameters) throws Exception {
lastVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastVisitDatState", String.class));
sdf = new SimpleDateFormat("yyyyMMdd");
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
//获取新老访客状态
String isNew = jsonObject.getJSONObject("common").getString("is_new");
if ("1".equals(isNew)) {
//如果是新访客状态才有可能需要进行修复;老访客不需要修复
String lastVisitDate = lastVisitDateState.value();
String curVisitDate = sdf.format(jsonObject.getLong("ts"));
//判断上次访问是否为空
if (lastVisitDate != null && !"".equals(lastVisitDate)) {
//访问过,修复
//判断是否在同一天访问
if (!lastVisitDate.equals(curVisitDate)) {
isNew = "0";
jsonObject.getJSONObject("common").put("is_new", isNew);
}
} else {
//当前是第一次访问
lastVisitDateState.update(curVisitDate);
}
}
return jsonObject;
}
}
);
//TODO 6.按照日志类型对日志进行分流 --定义侧输出流标记输入到不同的侧输出流
//6.1 声明侧输出流Tag
//启动日志--启动侧输出流
OutputTag<String> startTag = new OutputTag<String>("start") {
};//需要加{}因为不加是调用他的父类那么就获取不到泛型可以从父类的构造方法中看出
//曝光日志--曝光侧输出流
OutputTag<String> displayTag = new OutputTag<String>("display") {
};//需要加{}因为不加是调用他的父类那么就获取不到泛型可以从父类的构造方法中看出
//页面日志--主流
//TODO 5.新老访客状态进行修复 //5.2 使用侧输出流完成分流 process算子因为process算子可以拿到上下文,从而输出到侧输出流
SingleOutputStreamOperator<String> pageDs = jsonWithNewDs.process(
new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObject, ProcessFunction<JSONObject, String>.Context context, Collector<String> collector) throws Exception {
//获取启动jsonObject
JSONObject startObject = jsonObject.getJSONObject("start");
String jsonString = jsonObject.toJSONString();
//判断是否为启动日志
if (startObject != null && startObject.size() > 0) {
//是启动日志
context.output(startTag, jsonString);
} else {
//如果不是启动日志则都是页面日志放入主流
collector.collect(jsonString);
//判断是否是曝光日志
JSONArray displays = jsonObject.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
//获取时间戳和页面id
String ts = jsonObject.getString("ts");
String pageId = jsonObject.getJSONObject("page").getString("page_id");
//有曝光信息遍历数组获取每一条曝光数据
for (int i = 0; i < displays.size(); i++) {
JSONObject displayJSONObject = displays.getJSONObject(i);
//TODO 6.安装日志类型对日志进行分流 //放入曝光侧输出流中
//放入时间戳和页面id
displayJSONObject.put("ts", ts);
displayJSONObject.put("page_id", pageId);
context.output(displayTag, displayJSONObject.toJSONString());
}
}
}
}
}
);
//6.3 获取不同流数据 输出测试
DataStream<String> startDs = pageDs.getSideOutput(startTag);
DataStream<String> displayDS = pageDs.getSideOutput(displayTag);
// pageDs.print(">>>>");
// startDs.print("####");
// displayDS.print("&&&&");
//TODO 7.将不同流的数据写到kafka的dwd不同的主题中 //TODO 7.将不同流的数据写到kafka的dwd不同的主题中
// 为什么不在判断之后直接输入到kafka而是放入流中?
// 因为这样就不能保证精确一致性了需要自己实现对应的二阶段提交
pageDs.addSink(
MyKafkaUtils.getKafkaSink("dwd_page_log")
);
startDs.addSink(
MyKafkaUtils.getKafkaSink("dwd_start_log")
);
displayDS.addSink(
MyKafkaUtils.getKafkaSink("dwd_display_log")
);
env.execute(); env.execute();
} }
} }

View File

@ -0,0 +1,233 @@
//
{
"common": {
"ar": "370000",
"uid": "31",
"os": "Android 11.0",
"ch": "xiaomi",
"is_new": "0",
"md": "Xiaomi 9",
"mid": "mid_12",
"vc": "v2.1.111",
"ba": "Xiaomi"
},
"start": {
"entry": "icon",
"open_ad_skip_ms": 0,
"open_ad_ms": 1662,
"loading_time": 2531,
"open_ad_id": 4
},
"ts": 1683276802000
}
// --
{
"common": {
"ar": "370000",
"uid": "31",
"os": "Android 11.0",
"ch": "xiaomi",
"is_new": "0",
"md": "Xiaomi 9",
"mid": "mid_12",
"vc": "v2.1.111",
"ba": "Xiaomi"
},
"page": {
"page_id": "home",
"during_time": 15127
},
"displays": [
{
"display_type": "activity",
"item": "1",
"item_type": "activity_id",
"pos_id": 5,
"order": 1
},
{
"display_type": "activity",
"item": "2",
"item_type": "activity_id",
"pos_id": 5,
"order": 2
},
{
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 4,
"order": 3
},
{
"display_type": "query",
"item": "5",
"item_type": "sku_id",
"pos_id": 3,
"order": 4
},
{
"display_type": "promotion",
"item": "8",
"item_type": "sku_id",
"pos_id": 4,
"order": 5
},
{
"display_type": "query",
"item": "5",
"item_type": "sku_id",
"pos_id": 4,
"order": 6
},
{
"display_type": "promotion",
"item": "1",
"item_type": "sku_id",
"pos_id": 1,
"order": 7
},
{
"display_type": "query",
"item": "4",
"item_type": "sku_id",
"pos_id": 2,
"order": 8
}
],
"ts": 1683276802000
}
{
"common": {
"ar": "370000",
"uid": "31",
"os": "Android 11.0",
"ch": "xiaomi",
"is_new": "0",
"md": "Xiaomi 9",
"mid": "mid_12",
"vc": "v2.1.111",
"ba": "Xiaomi"
},
"page": {
"page_id": "good_detail",
"item": "8",
"during_time": 15379,
"item_type": "sku_id",
"last_page_id": "home",
"source_type": "recommend"
},
"displays": [
{
"display_type": "query",
"item": "7",
"item_type": "sku_id",
"pos_id": 2,
"order": 1
},
{
"display_type": "query",
"item": "10",
"item_type": "sku_id",
"pos_id": 3,
"order": 2
},
{
"display_type": "promotion",
"item": "3",
"item_type": "sku_id",
"pos_id": 2,
"order": 3
},
{
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 3,
"order": 4
},
{
"display_type": "recommend",
"item": "3",
"item_type": "sku_id",
"pos_id": 4,
"order": 5
},
{
"display_type": "query",
"item": "4",
"item_type": "sku_id",
"pos_id": 4,
"order": 6
},
{
"display_type": "query",
"item": "7",
"item_type": "sku_id",
"pos_id": 4,
"order": 7
},
{
"display_type": "promotion",
"item": "10",
"item_type": "sku_id",
"pos_id": 2,
"order": 8
},
{
"display_type": "promotion",
"item": "10",
"item_type": "sku_id",
"pos_id": 5,
"order": 9
}
],
"actions": [
{
"item": "8",
"action_id": "favor_add",
"item_type": "sku_id",
"ts": 1683276809689
}
],
"ts": 1683276802000
}
//json
{
"database": "rt_gmall",
"xid": 26161,
"data": {
"sku_num": "1",
"create_time": "2023-05-05 21:31:47",
"sku_id": 6,
"order_price": 1299.00,
"source_type": "2404",
"img_url": "http://47.93.148.192:8080/group1/M00/00/01/rBHu8l-rgJqAHPnoAAF9hoDNfsc505.jpg",
"sku_name": "Redmi 10X 4G Helio G85游戏芯 4800万超清四摄 5020mAh大电量 小孔全面屏 128GB大存储 8GB+128GB 冰雾白 游戏智能手机 小米 红米",
"id": 86497,
"source_id": 2,
"order_id": 28748,
"split_total_amount": 1299.00
},
"xoffset": 523,
"type": "insert",
"table": "order_detail",
"ts": 1683293507
}
{
"database": "rt_gmall",
"xid": 26161,
"data": {
"order_status": "1001",
"id": 74863,
"order_id": 28744,
"operate_time": "2023-05-05 21:31:47"
},
"xoffset": 529,
"type": "insert",
"table": "order_status_log",
"ts": 1683293507
}

View File

@ -3,6 +3,7 @@ package com.atguigu.gmall.realtime.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties; import java.util.Properties;
@ -21,8 +22,16 @@ public class MyKafkaUtils {
return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties); return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
}
//获取kafka的生产者
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
} }