diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java new file mode 100644 index 0000000..04d6280 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseDBApp.java @@ -0,0 +1,82 @@ +package com.atguigu.gmall.realtime.app.dwd; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dwd + *@Author: markilue + *@CreateTime: 2023-05-05 21:13 + *@Description: TODO 业务数据动态分流 + *@Version: 1.0 + */ +public class BaseDBApp { + + public static void main(String[] args) throws Exception { + //TODO 1.基本环境准确 + //1.1 流处理环境 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + //1.2 设置并行度 + env.setParallelism(4); + + //TODO 2.检查点设置 + //2.1 开启检查点 + env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); + //2.2 设置检查点超时时间 + env.getCheckpointConfig().setCheckpointTimeout(60000L); + //2.3 设置重启策略 + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + //2.4 设置job取消后,检查点是否保存 + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + //2.5 设置状态后端 内存/文件系统/RocksDB + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); + //2.6 指定操作HDFS用户 + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + //TODO 3.从kafka中读取数据 + //3.1 声明消费主题以及消费者组 + String topic = "ods_base_db_m"; + String groupId = "base_db_app_group"; + //3.2 获取消费者对象 + FlinkKafkaConsumer kafkaSource = MyKafkaUtils.getKafkaSource(topic, groupId); + //3.3 读取数据 封装流 + DataStreamSource kafkaDs = env.addSource(kafkaSource); + + //TODO 4.对数据类型进行转换 String ->JSONObject + SingleOutputStreamOperator jsonObjDs = kafkaDs.map(JSON::parseObject); + + //TODO 5.简单ETL + SingleOutputStreamOperator filterDs = jsonObjDs.filter( + jsonObject -> { + boolean flag = jsonObject.getString("table") != null + && jsonObject.getString("table").length() > 0 + && jsonObject.getJSONObject("data") != null + && jsonObject.getString("data").length() > 3; + return flag; + + } + ); + filterDs.print(">>>>"); + + //TODO 6.动态分流 **** + + + //TODO 7.将维度侧输出流的数据写到Hbase中 + + //TODO 8.将主流数据写回到Kafka的dwd层中 + + env.execute(); + + + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java index a38980c..e3e7217 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/BaseLogApp.java @@ -1,20 +1,33 @@ package com.atguigu.gmall.realtime.app.dwd; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.atguigu.gmall.realtime.utils.MyKafkaUtils; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.kafka.clients.consumer.ConsumerConfig; +import java.text.SimpleDateFormat; import java.util.Properties; /** @@ -24,6 +37,19 @@ import java.util.Properties; * 曝光日志放到曝光侧输出流中 * 页面日志放到页面侧输出流中 * 将不同流的数据协会kafka的dwd主题中 + * + * 日志数据分流执行流程: + * -需要启动的进程: + * zk,kafka,[hdfs],logger,BaseLogApp + * -运行模拟生成日志jar包 + * -将生成的日志发送给Nginx + * -Nginx接受到数据之后,进行请求转发,将请求发送给Ding202,203,204上的日志采集服务 + * -日志采集服务对数据进行输出,落盘以及发送到kafka的ods_base_log + * -BaseLogApp从ods_base_log读取数据 + * >结构转换 String ->JSONObject + * >状态修复 分组,修复 + * >分流 + * >将分流后的数据写到kafka的dwd层不同的主题中 */ public class BaseLogApp { @@ -37,20 +63,19 @@ public class BaseLogApp { env.setParallelism(4); //这里需要与kafka对应的分区相对应 - //TODO 2.检查点相关设置 //2.1 开启检查点 env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); - //2.1 设置检查点超时时间 + //2.2 设置检查点超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); - //2.1 设置重启策略 这里是固定次数的重启 ->3秒钟重启一次,一共最多重启3次 - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L)); - //2.1 设置job取消后,检查点是否保留 这里设置保留检查点-> + //2.3 设置重启策略 这里是固定次数的重启 ->3秒钟重启一次,一共最多重启3次 + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + //2.4 设置job取消后,检查点是否保留 这里设置保留检查点-> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - //2.1 设置状态后端 即设置检查点的存储位置 内存|文件系统|RocksDB RocksDB类似于Redis会在内存中存一份,也会进行落盘 + //2.5 设置状态后端 即设置检查点的存储位置 内存|文件系统|RocksDB RocksDB类似于Redis会在内存中存一份,也会进行落盘 env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); - //2 .6 指定操作HDFS的用户 - System.setProperty("HADOOP_USER_NAME","dingjiawen"); + //2.6 指定操作HDFS的用户 + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); //TODO 3.从kafka中读取数据 //3.1 声明消费的主题 @@ -79,32 +104,140 @@ public class BaseLogApp { //方法的默认实现 // kafkaDS.map(JSON::parseObject); - jsonObjectDs.print(">>>"); +// jsonObjectDs.print(">>>"); + + //TODO 5.新老访客状态进行修复 --flink状态编程:键控状态 + //5.1 按照用户id进行分组 --jsonObject的格式具体参照log.json + KeyedStream keyedDS = jsonObjectDs.keyBy( + jsonObject -> jsonObject.getJSONObject("common").getString("mid") + ); + + //5.2 修复 --仍然是之前的记录只是改变他的is_new属性,因此用map + SingleOutputStreamOperator jsonWithNewDs = keyedDS.map( + new RichMapFunction() { + //注意:不能再生命的时候对状态进行初始化,因为这个时候还不能获取到getRuntimeContext + private ValueState lastVisitDateState; + private SimpleDateFormat sdf;//如果在这里初始化,那么每调用一次map方法就会创建一个对象 + + @Override + public void open(Configuration parameters) throws Exception { + lastVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor("lastVisitDatState", String.class)); + sdf = new SimpleDateFormat("yyyyMMdd"); + } + + @Override + public JSONObject map(JSONObject jsonObject) throws Exception { + + //获取新老访客状态 + String isNew = jsonObject.getJSONObject("common").getString("is_new"); + if ("1".equals(isNew)) { + //如果是新访客状态,才有可能需要进行修复;老访客不需要修复 + String lastVisitDate = lastVisitDateState.value(); + String curVisitDate = sdf.format(jsonObject.getLong("ts")); + //判断上次访问是否为空 + if (lastVisitDate != null && !"".equals(lastVisitDate)) { + //访问过,修复 + //判断是否在同一天访问 + if (!lastVisitDate.equals(curVisitDate)) { + isNew = "0"; + jsonObject.getJSONObject("common").put("is_new", isNew); + } + + } else { + //当前是第一次访问 + lastVisitDateState.update(curVisitDate); + } + + } + return jsonObject; + } + } + + ); + //TODO 6.按照日志类型对日志进行分流 --定义侧输出流标记,输入到不同的侧输出流 + //6.1 声明侧输出流Tag + + //启动日志--启动侧输出流 + OutputTag startTag = new OutputTag("start") { + };//需要加{}因为不加是调用他的父类,那么就获取不到泛型,可以从父类的构造方法中看出 + //曝光日志--曝光侧输出流 + OutputTag displayTag = new OutputTag("display") { + };//需要加{}因为不加是调用他的父类,那么就获取不到泛型,可以从父类的构造方法中看出 + //页面日志--主流 - //TODO 5.新老访客状态进行修复 + //5.2 使用侧输出流完成分流 process算子,因为process算子可以拿到上下文,从而输出到侧输出流 + SingleOutputStreamOperator pageDs = jsonWithNewDs.process( + new ProcessFunction() { + @Override + public void processElement(JSONObject jsonObject, ProcessFunction.Context context, Collector collector) throws Exception { + //获取启动jsonObject + JSONObject startObject = jsonObject.getJSONObject("start"); + String jsonString = jsonObject.toJSONString(); + //判断是否为启动日志 + if (startObject != null && startObject.size() > 0) { + //是启动日志 + context.output(startTag, jsonString); + } else { + //如果不是启动日志,则都是页面日志;放入主流 + collector.collect(jsonString); + //判断是否是曝光日志 + JSONArray displays = jsonObject.getJSONArray("displays"); + if (displays != null && displays.size() > 0) { + //获取时间戳和页面id + String ts = jsonObject.getString("ts"); + String pageId = jsonObject.getJSONObject("page").getString("page_id"); + //有曝光信息,遍历数组,获取每一条曝光数据 + for (int i = 0; i < displays.size(); i++) { + JSONObject displayJSONObject = displays.getJSONObject(i); - //TODO 6.安装日志类型对日志进行分流 + //放入曝光侧输出流中 + //放入时间戳和页面id + displayJSONObject.put("ts", ts); + displayJSONObject.put("page_id", pageId); + context.output(displayTag, displayJSONObject.toJSONString()); + } + + } + + } + + } + } + ); + + //6.3 获取不同流数据 输出测试 + DataStream startDs = pageDs.getSideOutput(startTag); + DataStream displayDS = pageDs.getSideOutput(displayTag); +// pageDs.print(">>>>"); +// startDs.print("####"); +// displayDS.print("&&&&"); //TODO 7.将不同流的数据写到kafka的dwd不同的主题中 + // 为什么不在判断之后直接输入到kafka而是放入流中? + // 因为这样就不能保证精确一致性了,需要自己实现对应的二阶段提交 + pageDs.addSink( + MyKafkaUtils.getKafkaSink("dwd_page_log") + ); + + startDs.addSink( + MyKafkaUtils.getKafkaSink("dwd_start_log") + ); + displayDS.addSink( + MyKafkaUtils.getKafkaSink("dwd_display_log") + ); env.execute(); - - - } - - - } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json new file mode 100644 index 0000000..83dda0e --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json @@ -0,0 +1,233 @@ +//启动日志格式参照 +{ + "common": { + "ar": "370000", + "uid": "31", + "os": "Android 11.0", + "ch": "xiaomi", + "is_new": "0", + "md": "Xiaomi 9", + "mid": "mid_12", + "vc": "v2.1.111", + "ba": "Xiaomi" + }, + "start": { + "entry": "icon", + "open_ad_skip_ms": 0, + "open_ad_ms": 1662, + "loading_time": 2531, + "open_ad_id": 4 + }, + "ts": 1683276802000 +} + +//页面日志参照 --曝光日志 +{ + "common": { + "ar": "370000", + "uid": "31", + "os": "Android 11.0", + "ch": "xiaomi", + "is_new": "0", + "md": "Xiaomi 9", + "mid": "mid_12", + "vc": "v2.1.111", + "ba": "Xiaomi" + }, + "page": { + "page_id": "home", + "during_time": 15127 + }, + "displays": [ + { + "display_type": "activity", + "item": "1", + "item_type": "activity_id", + "pos_id": 5, + "order": 1 + }, + { + "display_type": "activity", + "item": "2", + "item_type": "activity_id", + "pos_id": 5, + "order": 2 + }, + { + "display_type": "query", + "item": "8", + "item_type": "sku_id", + "pos_id": 4, + "order": 3 + }, + { + "display_type": "query", + "item": "5", + "item_type": "sku_id", + "pos_id": 3, + "order": 4 + }, + { + "display_type": "promotion", + "item": "8", + "item_type": "sku_id", + "pos_id": 4, + "order": 5 + }, + { + "display_type": "query", + "item": "5", + "item_type": "sku_id", + "pos_id": 4, + "order": 6 + }, + { + "display_type": "promotion", + "item": "1", + "item_type": "sku_id", + "pos_id": 1, + "order": 7 + }, + { + "display_type": "query", + "item": "4", + "item_type": "sku_id", + "pos_id": 2, + "order": 8 + } + ], + "ts": 1683276802000 +} + +{ + "common": { + "ar": "370000", + "uid": "31", + "os": "Android 11.0", + "ch": "xiaomi", + "is_new": "0", + "md": "Xiaomi 9", + "mid": "mid_12", + "vc": "v2.1.111", + "ba": "Xiaomi" + }, + "page": { + "page_id": "good_detail", + "item": "8", + "during_time": 15379, + "item_type": "sku_id", + "last_page_id": "home", + "source_type": "recommend" + }, + "displays": [ + { + "display_type": "query", + "item": "7", + "item_type": "sku_id", + "pos_id": 2, + "order": 1 + }, + { + "display_type": "query", + "item": "10", + "item_type": "sku_id", + "pos_id": 3, + "order": 2 + }, + { + "display_type": "promotion", + "item": "3", + "item_type": "sku_id", + "pos_id": 2, + "order": 3 + }, + { + "display_type": "query", + "item": "8", + "item_type": "sku_id", + "pos_id": 3, + "order": 4 + }, + { + "display_type": "recommend", + "item": "3", + "item_type": "sku_id", + "pos_id": 4, + "order": 5 + }, + { + "display_type": "query", + "item": "4", + "item_type": "sku_id", + "pos_id": 4, + "order": 6 + }, + { + "display_type": "query", + "item": "7", + "item_type": "sku_id", + "pos_id": 4, + "order": 7 + }, + { + "display_type": "promotion", + "item": "10", + "item_type": "sku_id", + "pos_id": 2, + "order": 8 + }, + { + "display_type": "promotion", + "item": "10", + "item_type": "sku_id", + "pos_id": 5, + "order": 9 + } + ], + "actions": [ + { + "item": "8", + "action_id": "favor_add", + "item_type": "sku_id", + "ts": 1683276809689 + } + ], + "ts": 1683276802000 +} + +//业务数据json +{ + "database": "rt_gmall", + "xid": 26161, + "data": { + "sku_num": "1", + "create_time": "2023-05-05 21:31:47", + "sku_id": 6, + "order_price": 1299.00, + "source_type": "2404", + "img_url": "http://47.93.148.192:8080/group1/M00/00/01/rBHu8l-rgJqAHPnoAAF9hoDNfsc505.jpg", + "sku_name": "Redmi 10X 4G Helio G85游戏芯 4800万超清四摄 5020mAh大电量 小孔全面屏 128GB大存储 8GB+128GB 冰雾白 游戏智能手机 小米 红米", + "id": 86497, + "source_id": 2, + "order_id": 28748, + "split_total_amount": 1299.00 + }, + "xoffset": 523, + "type": "insert", + "table": "order_detail", + "ts": 1683293507 +} +{ + "database": "rt_gmall", + "xid": 26161, + "data": { + "order_status": "1001", + "id": 74863, + "order_id": 28744, + "operate_time": "2023-05-05 21:31:47" + }, + "xoffset": 529, + "type": "insert", + "table": "order_status_log", + "ts": 1683293507 +} \ No newline at end of file diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java index 25027b1..0732c0e 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java @@ -3,6 +3,7 @@ package com.atguigu.gmall.realtime.utils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; @@ -21,8 +22,16 @@ public class MyKafkaUtils { return new FlinkKafkaConsumer(topic, new SimpleStringSchema(), properties); + } + + //获取kafka的生产者 + public static FlinkKafkaProducer getKafkaSink(String topic) { + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); +// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties); }