diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml index f93ab7e..06f2685 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml @@ -141,6 +141,29 @@ 3.3.0 + + + ru.yandex.clickhouse + clickhouse-jdbc + 0.3.0 + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + + + + org.apache.flink + flink-connector-jdbc_${scala.version} + ${flink.version} + + diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/Base.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/Base.java new file mode 100644 index 0000000..013b447 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/Base.java @@ -0,0 +1,46 @@ +package com.atguigu.gmall.realtime.app.dwd; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dwd + *@Author: markilue + *@CreateTime: 2023-05-10 20:06 + *@Description: TODO 抽取基类:模板方法设计模式 + *@Version: 1.0 + */ +public abstract class Base { + + public void entry() throws Exception { + //TODO 1.基本环境准确 + //1.1 流处理环境 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + //1.2 设置并行度 + env.setParallelism(4); + + //TODO 2.检查点设置 + //2.1 开启检查点 + env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); + //2.2 设置检查点超时时间 + env.getCheckpointConfig().setCheckpointTimeout(60000L); + //2.3 设置重启策略 + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + //2.4 设置job取消后,检查点是否保存 + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + //2.5 设置状态后端 内存/文件系统/RocksDB + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); + //2.6 指定操作HDFS用户 + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + execute(env); + + env.execute(); + } + + public abstract void execute(StreamExecutionEnvironment environment); +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java index 2635d5c..b52af13 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java @@ -1,10 +1,13 @@ package com.atguigu.gmall.realtime.app.dwm; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.app.func.DimAsyncFunction; import com.atguigu.gmall.realtime.beans.OrderDetail; import com.atguigu.gmall.realtime.beans.OrderInfo; import com.atguigu.gmall.realtime.beans.OrderWide; import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.avro.Schema; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; @@ -13,17 +16,23 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; +import java.util.Date; +import java.util.concurrent.TimeUnit; /** *@BelongsProject: rt-gmall-parent @@ -31,6 +40,24 @@ import java.time.Duration; *@Author: markilue *@CreateTime: 2023-05-09 14:33 *@Description: TODO 订单宽表准备 + * -- 需要启动的进程: + * zk,hdfs,hbase,kafka,maxwell,,redis + * BaseDBApp,OrderWideApp + * --执行流程: + * >运行模拟生成业务数据的jar + * >会想业务数据库Mysql中插入生成业务数据 + * >mysql会将变化的数据放到binlog中 + * >maxwell从binlog中获取数据,将位数封装为json字符串发送到kafka的ods主题中 ods_base_db_m + * >BaseLogApp从ods_base_db_m主题中读取数据,进行分流 + * &事实数据----写回到kafka的dwd主题 + * &业务数据----保存到phoenix的维度表中 + * >OrderWideApp从dwd主题中获取订单和订单明细数据 + * >使用intervalJoin对订单和订单明细进行双流join + * >将用户维度关联到订单宽表上 + * *基本的维度关联 + * *优化1:旁路缓存 + * *优化2:异步IO + * *@Version: 1.0 */ public class OrderWideApp { @@ -146,12 +173,139 @@ public class OrderWideApp { } ); - orderWideDS.print(">>>"); +// orderWideDS.print(">>>"); + //TODO 8.和用户维度进行关联 ->异步操作进行关联 + SingleOutputStreamOperator orderWideWithUserDS = AsyncDataStream.unorderedWait( + orderWideDS, + new DimAsyncFunction("DIM_USER_INFO") { + @Override + public void join(OrderWide orderWide, JSONObject dimJsonObj) throws ParseException { + String gender = dimJsonObj.getString("GENDER"); + //计算年龄 + String birthday = dimJsonObj.getString("BIRTHDAY");//1972-06-15 + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + Date birthdayDate = sdf.parse(birthday); + long betweenMs = System.currentTimeMillis() - birthdayDate.getTime(); + long ageLong = betweenMs / 1000L / 60L / 60L / 24L / 365L; + int age = (int) ageLong; + + //维度关联 + orderWide.setUser_gender(gender); + orderWide.setUser_age(age); + } + + @Override + public String getKey(OrderWide orderWide) { + return orderWide.getUser_id().toString(); + } + } + , + 60, + TimeUnit.SECONDS + ); + + //TODO 9.和地区纬度进行关联 + SingleOutputStreamOperator orderWideWithProvinceDS = AsyncDataStream.unorderedWait( + orderWideWithUserDS, + new DimAsyncFunction("DIM_BASE_PROVINCE") { + @Override + public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception { + orderWide.setProvince_3166_2_code(dimJsonObj.getString("NAME")); + orderWide.setProvince_area_code(dimJsonObj.getString("ISO_3166_2")); + orderWide.setProvince_name(dimJsonObj.getString("AREA_CODE")); + orderWide.setProvince_iso_code(dimJsonObj.getString("ISO_CODE")); + } + + @Override + public String getKey(OrderWide input) { + return input.getProvince_id().toString(); + } + }, + 60, TimeUnit.SECONDS + ); + + //TODO 10.和SKU维度进行关联 + SingleOutputStreamOperator orderWideWithSkuDS = AsyncDataStream.unorderedWait( + orderWideWithProvinceDS, + new DimAsyncFunction("DIM_SKU_INFO") { + @Override + public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception { + orderWide.setSku_name(jsonObject.getString("SKU_NAME")); + orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID")); + orderWide.setSpu_id(jsonObject.getLong("SPU_ID")); + orderWide.setTm_id(jsonObject.getLong("TM_ID")); + } + + @Override + public String getKey(OrderWide orderWide) { + return String.valueOf(orderWide.getSku_id()); + } + }, 60, TimeUnit.SECONDS); + + + //TODO 11.和SPU维度进行关联 + SingleOutputStreamOperator orderWideWithSpuDS = AsyncDataStream.unorderedWait( + orderWideWithSkuDS, + new DimAsyncFunction("DIM_SPU_INFO") { + @Override + public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception { + orderWide.setSpu_name(jsonObject.getString("SPU_NAME")); + } + + @Override + public String getKey(OrderWide orderWide) { + return String.valueOf(orderWide.getSpu_id()); + } + }, 60, TimeUnit.SECONDS); + + + //TODO 12.和类别度进行关联 + + SingleOutputStreamOperator orderWideWithCategory3DS = AsyncDataStream.unorderedWait( + orderWideWithSpuDS, + new DimAsyncFunction("DIM_BASE_CATEGORY3") { + @Override + public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception { + orderWide.setCategory3_name(jsonObject.getString("NAME")); + } + + @Override + public String getKey(OrderWide orderWide) { + return String.valueOf(orderWide.getCategory3_id()); + } + }, 60, TimeUnit.SECONDS); + + + //TODO 13.和品牌维度进行关联 + SingleOutputStreamOperator orderWideWithTmDS = AsyncDataStream.unorderedWait( + orderWideWithCategory3DS, + new DimAsyncFunction("DIM_BASE_TRADEMARK") { + @Override + public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception { + orderWide.setTm_name(jsonObject.getString("TM_NAME")); + } + + @Override + public String getKey(OrderWide orderWide) { + return String.valueOf(orderWide.getTm_id()); + } + }, 60, TimeUnit.SECONDS); + + + orderWideWithTmDS.print(">>>>>"); +// orderWideWithTmDS.print(">>>>>"); + + //TODO 14.将订单宽表数据写回kafka的dwm_order_wide + //JSON.parseObject(jsonStr): 将字符串转换为json对象 + //JSON.parseObject(jsonStr,类型): 将字符串转换为对应类型的对象 + //JSON.toJSONString(对象): 将对象转换为json格式字符串 + orderWideWithTmDS.map(JSON::toJSONString).addSink( + MyKafkaUtils.getKafkaSink("dwm_order_wide") + ); env.execute(); - - } } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java new file mode 100644 index 0000000..fd7240a --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java @@ -0,0 +1,130 @@ +package com.atguigu.gmall.realtime.app.dwm; + +import com.alibaba.fastjson.JSON; +import com.atguigu.gmall.realtime.beans.OrderWide; +import com.atguigu.gmall.realtime.beans.PaymentInfo; +import com.atguigu.gmall.realtime.beans.PaymentWide; +import com.atguigu.gmall.realtime.utils.DateTimeUtils; +import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +import java.time.Duration; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm + *@Author: markilue + *@CreateTime: 2023-05-10 19:03 + *@Description: TODO 支付宽表准备 + *@Version: 1.0 + */ +public class PaymentWideApp { + + public static void main(String[] args) throws Exception { + //TODO 1.创建流环境 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + //TODO 2.检查点配置 + env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.getCheckpointConfig().setCheckpointTimeout(60000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + env.setStateBackend(new FsStateBackend("hdfs://Ding202/rt_gmall/gmall")); + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + //TODO 3.从kafka中读取数据(订单宽表和dwd_payment_info) + String paymentTopic = "dwd_payment_info"; + String orderWideTopic = "dwm_order_wide"; + String groupId = "payment_wide_app_group"; + DataStreamSource paymentStrDS = env.addSource( + MyKafkaUtils.getKafkaSource(paymentTopic, groupId) + ); + DataStreamSource orderWideStrDS = env.addSource( + MyKafkaUtils.getKafkaSource(orderWideTopic, groupId) + ); + + //TODO 4.数据格式进行转换转为paymentInfo,orderWide对象 + SingleOutputStreamOperator paymentDS = paymentStrDS.map( + new MapFunction() { + @Override + public PaymentInfo map(String s) throws Exception { + return JSON.parseObject(s, PaymentInfo.class); + } + } + ); + SingleOutputStreamOperator orderWideDS = orderWideStrDS.map( + orderWideStr -> JSON.parseObject(orderWideStr, OrderWide.class) + ); + +// paymentDS.print(">>>>"); +// orderWideDS.print("$$$$"); + + //TODO 5.注册水位线 + SingleOutputStreamOperator paymentInfoWithWatermarkDS = paymentDS.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner( + new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(PaymentInfo paymentInfo, long l) { + return DateTimeUtils.toTs(paymentInfo.getCallback_time()); + } + } + ) + ); + + SingleOutputStreamOperator orderWideWithWatermarkDS = orderWideDS.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner( + new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(OrderWide orderWide, long l) { + return DateTimeUtils.toTs(orderWide.getCreate_date()); + } + } + ) + ); + + //TODO 6.分组,创建连接字段 + KeyedStream paymentInfoKeyedDS = paymentInfoWithWatermarkDS.keyBy(PaymentInfo::getOrder_id); + KeyedStream orderWideKeyedDS = orderWideWithWatermarkDS.keyBy(OrderWide::getOrder_id); + + //TODO 7.双流join + SingleOutputStreamOperator paymentWideDS = paymentInfoKeyedDS + .intervalJoin(orderWideKeyedDS) + .between(Time.seconds(-1800), Time.seconds(0))//往前半小时 + .process( + new ProcessJoinFunction() { + @Override + public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, ProcessJoinFunction.Context context, Collector collector) throws Exception { + collector.collect(new PaymentWide(paymentInfo, orderWide)); + } + } + ); + + paymentWideDS.print(">>>>"); + + //TODO 8.将支付宽表数据写到kafka的dwm_payment_wide + paymentWideDS + .map(paymentWide -> JSON.toJSONString(paymentDS)) + .addSink( + MyKafkaUtils.getKafkaSink("dwm_payment_wide") + ); + + + env.execute(); + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java new file mode 100644 index 0000000..b693e47 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java @@ -0,0 +1,304 @@ +package com.atguigu.gmall.realtime.app.dws; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.beans.VisitorStats; +import com.atguigu.gmall.realtime.utils.DateTimeUtils; +import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.*; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Duration; +import java.util.Date; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dws + *@Author: markilue + *@CreateTime: 2023-05-10 21:04 + *@Description: TODO 访客主题统计DWS + *@Version: 1.0 + */ +public class VisitorStatsApp { + + public static void main(String[] args) throws Exception { + //TODO 1.流环境建立 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + //TODO 2.检查点设置 + env.enableCheckpointing(5000L); + env.getCheckpointConfig().setCheckpointTimeout(3000L); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L)); + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + + //TODO 3.从kafka中读取数据 + String groupId = "visitor_stats_app_group"; + String pageLogTopic = "dwd_page_log"; + DataStreamSource pageLogStrDS = env.addSource( + MyKafkaUtils.getKafkaSource(pageLogTopic, groupId) + ); + + String uniqueVisitTopic = "dwm_unique_visitor"; + DataStreamSource uniqueVisitStrDS = env.addSource( + MyKafkaUtils.getKafkaSource(uniqueVisitTopic, groupId) + ); + + String userJumpDetailDetailTopic = "dwm_user_jump_detail"; + DataStreamSource userJumpDetailDetailStrDS = env.addSource( + MyKafkaUtils.getKafkaSource(userJumpDetailDetailTopic, groupId) + ); + +// pageLogStrDS.print(">>>>"); +// uniqueVisitStrDS.print("$$$$"); +// userJumpDetailDetailStrDS.print("&&&&"); + + //TODO 4.对流中数据进行格式转换 封装流 string ->VisitorStats + + //4.1 dwd_page_log流中的数据的转换 + SingleOutputStreamOperator pvStateDS = pageLogStrDS.map( + /* + { + "common": { + "ar": "440000", + "uid": "9", + "os": "Android 11.0", + "ch": "vivo", + "is_new": "1", + "md": "Xiaomi 9", + "mid": "mid_10", + "vc": "v2.1.132", + "ba": "Xiaomi" + }, + "page": { + "page_id": "cart", + "during_time": 2832, + "last_page_id": "good_detail" + }, + "ts": 1683724788000 + } + */ + new MapFunction() { + @Override + public VisitorStats map(String s) throws Exception { + JSONObject jsonObject = JSON.parseObject(s); + JSONObject commonJsonObj = jsonObject.getJSONObject("common"); + JSONObject pageJsonObj = jsonObject.getJSONObject("page"); + VisitorStats visitorStats = new VisitorStats( + "", + "", + commonJsonObj.getString("vc"), + commonJsonObj.getString("ch"), + commonJsonObj.getString("ar"), + commonJsonObj.getString("is_new"), + 0L, + 1L, + 0L, + 0L, + pageJsonObj.getLong("during_time"), + jsonObject.getLong("ts") + ); + //判断是否为新的会话 + String lastPageId = pageJsonObj.getString("last_page_id"); + if (lastPageId == null || lastPageId.length() == 0) { + visitorStats.setSv_ct(1L); + } + + return visitorStats; + } + } + ); + + //4.2 dwm_unique_visitor流中的数据的转换 + SingleOutputStreamOperator uvStateDS = uniqueVisitStrDS.map( + /* + { + "common": { + "ar": "440000", + "uid": "9", + "os": "Android 11.0", + "ch": "vivo", + "is_new": "1", + "md": "Xiaomi 9", + "mid": "mid_10", + "vc": "v2.1.132", + "ba": "Xiaomi" + }, + "page": { + "page_id": "cart", + "during_time": 2832, + "last_page_id": "good_detail" + }, + "ts": 1683724788000 + } + */ + new MapFunction() { + @Override + public VisitorStats map(String s) throws Exception { + JSONObject jsonObject = JSON.parseObject(s); + JSONObject commonJsonObj = jsonObject.getJSONObject("common"); + JSONObject pageJsonObj = jsonObject.getJSONObject("page"); + VisitorStats visitorStats = new VisitorStats( + "", + "", + commonJsonObj.getString("vc"), + commonJsonObj.getString("ch"), + commonJsonObj.getString("ar"), + commonJsonObj.getString("is_new"), + 1L, + 0L, + 0L, + 0L, + 0L, + jsonObject.getLong("ts") + ); + + + return visitorStats; + } + } + ); + + //4.3 dwm_user_jump_detail流中的数据的转换 + SingleOutputStreamOperator ujdStateDS = userJumpDetailDetailStrDS.map( + new MapFunction() { + @Override + public VisitorStats map(String s) throws Exception { + JSONObject jsonObject = JSON.parseObject(s); + JSONObject commonJsonObj = jsonObject.getJSONObject("common"); + JSONObject pageJsonObj = jsonObject.getJSONObject("page"); + VisitorStats visitorStats = new VisitorStats( + "", + "", + commonJsonObj.getString("vc"), + commonJsonObj.getString("ch"), + commonJsonObj.getString("ar"), + commonJsonObj.getString("is_new"), + 0L, + 0L, + 0L, + 1L, + 0L, + jsonObject.getLong("ts") + ); + //判断是否为新的会话 + String lastPageId = pageJsonObj.getString("last_page_id"); + if (lastPageId == null || lastPageId.length() == 0) { + visitorStats.setSv_ct(1L); + } + + return visitorStats; + } + } + ); + + //TODO 5.将三条刘转换后的数据进行合并 + DataStream unionDS = pvStateDS.union(uvStateDS, ujdStateDS); + +// unionDS.print(">>>>>>"); + + + //TODO 6.指定watermark以及提取事件时间字段 + SingleOutputStreamOperator visitorStatWithWatermarkDS = unionDS + .assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner( + new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(VisitorStats visitorStats, long l) { + return visitorStats.getTs(); + } + } + ) + ); + + //TODO 7.按照维度对流中的数据进行分组 维度有:版本,渠道,地区,新老访客 所以我们定义分组的key为Tuple4类型 + KeyedStream> keyedDS = visitorStatWithWatermarkDS.keyBy( + new KeySelector>() { + @Override + public Tuple4 getKey(VisitorStats visitorStats) throws Exception { + return Tuple4.of( + visitorStats.getVc(), + visitorStats.getCh(), + visitorStats.getAr(), + visitorStats.getIs_new()); + } + } + ); + + //TODO 8.开窗 对分组有的数据 进行开窗计算 每个分组 独立窗口 ,分组之间互不影响 + WindowedStream, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.seconds(10))); + + //TODO 9.聚合计算 对窗口中的数据进行聚合计算 + SingleOutputStreamOperator reduceDS = windowDS.reduce( + new ReduceFunction() { + @Override + public VisitorStats reduce(VisitorStats visitorStats1, VisitorStats visitorStats2) throws Exception { + //度量值两两聚合 + visitorStats1.setPv_ct(visitorStats1.getPv_ct() + visitorStats2.getPv_ct()); + visitorStats1.setUv_ct(visitorStats1.getUv_ct() + visitorStats2.getUv_ct()); + visitorStats1.setUj_ct(visitorStats1.getUj_ct() + visitorStats2.getUj_ct()); + visitorStats1.setSv_ct(visitorStats1.getSv_ct() + visitorStats2.getSv_ct()); + visitorStats1.setDur_sum(visitorStats1.getDur_sum() + visitorStats2.getDur_sum()); + return visitorStats1; + } + }, + new ProcessWindowFunction, TimeWindow>() { + @Override + public void process(Tuple4 tuple4, ProcessWindowFunction, TimeWindow>.Context context, Iterable iterable, Collector collector) throws Exception { + //补全时间字段 + long start = context.window().getStart(); + long end = context.window().getEnd(); + for (VisitorStats visitorStats : iterable) { + visitorStats.setStt(DateTimeUtils.toYMDHMS(new Date(start))); + visitorStats.setEdt(DateTimeUtils.toYMDHMS(new Date(end))); + visitorStats.setTs(System.currentTimeMillis()); + //将处理之后的数据发送到下游 + collector.collect(visitorStats); + } + } + } + ); + reduceDS.print(">>>>>"); + + //TODO 10.将聚合统计之后的数据写到clickhouse + DataStreamSink visitorStatsDataStreamSink = reduceDS.addSink( + JdbcSink.sink( + "", + new JdbcStatementBuilder() { + @Override + public void accept(PreparedStatement preparedStatement, VisitorStats visitorStats) throws SQLException { + + } + }, + new JdbcExecutionOptions.Builder().build(), + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().build() + ) + ); + + env.execute(); + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimAsyncFunction.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimAsyncFunction.java new file mode 100644 index 0000000..872c4f1 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimAsyncFunction.java @@ -0,0 +1,79 @@ +package com.atguigu.gmall.realtime.app.func; + +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.utils.DimUtils; +import com.atguigu.gmall.realtime.utils.ThreadPoolUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; + +import java.text.ParseException; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-10 15:17 + *@Description: TODO 维度异步关联 + * 模板方法设计模式: + * 在父类中定义实现某一个功能的核心算法骨架,将具体的实现延迟的子类中实现 + * 子类在不改变父类核心算法骨架的前提下,每一个子类都可以有自己的实现 + *@Version: 1.0 + */ +public abstract class DimAsyncFunction extends RichAsyncFunction implements DimJoinFunction { + + ExecutorService executorService; + private String tableName; + + public DimAsyncFunction(String tableName) { + this.tableName = tableName; + } + + @Override + public void open(Configuration parameters) throws Exception { + //创建线程池对象 + executorService = ThreadPoolUtils.getInstance(); + } + + //发送异步请求,完成维度关联 + //通过创建多线程的方式 发送异步请求 + //asyncInvoke每处理一条流中的数据 都会执行一次 + @Override + public void asyncInvoke(T input, ResultFuture resultFuture) throws Exception { + //通过线程池对象获取线程 + executorService.submit( + new Runnable() { + //TODO 在run中的代码就是异步维度关联的操作 + @Override + public void run() { + try { + long start = System.currentTimeMillis(); + + //从对象获取维度关联的key + String key = getKey(input); + //根据key到维度表中获取维度对象 + JSONObject dimJsonObj = DimUtils.getDimInfoWithCache(tableName, key); + //把维度对象的属性赋值给流中对象属性(维度关联) + if (dimJsonObj != null) { + join(input, dimJsonObj); + } + + long end = System.currentTimeMillis(); + System.out.println("维度异步查询耗时:" + (end - start) + "毫秒"); + + resultFuture.complete(Collections.singleton(input));//将input转换为一个集合向下游传递 + } catch (Exception e) { + e.printStackTrace(); + System.out.println("维度异步查询发生异常"); + } + } + } + ); + + } + +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimJoinFunction.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimJoinFunction.java new file mode 100644 index 0000000..f071d4a --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimJoinFunction.java @@ -0,0 +1,22 @@ +package com.atguigu.gmall.realtime.app.func; + +import com.alibaba.fastjson.JSONObject; + +import java.text.ParseException; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-10 16:24 + *@Description: TODO + *@Version: 1.0 + */ +public interface DimJoinFunction { + + //怎么进行关联 + void join(T input, JSONObject dimJsonObj) throws Exception; + + //关联的id + String getKey(T input); +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java index 9d27442..613a0c3 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java @@ -16,10 +16,10 @@ import java.math.BigDecimal; public class OrderDetail { Long id; - Long order_id ; + Long order_id; Long sku_id; - BigDecimal order_price ; - Long sku_num ; + BigDecimal order_price; + Long sku_num; String sku_name; String create_time; BigDecimal split_total_amount; diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentInfo.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentInfo.java new file mode 100644 index 0000000..e3508a9 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentInfo.java @@ -0,0 +1,27 @@ +package com.atguigu.gmall.realtime.beans; + +import lombok.Data; + +import java.math.BigDecimal; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.beans + *@Author: markilue + *@CreateTime: 2023-05-10 18:57 + *@Description: TODO 订单信息实体类 + *@Version: 1.0 + */ +@Data +public class PaymentInfo { + + Long id; + Long order_id; + Long user_id; + BigDecimal total_amount; + String subject; + String payment_type; + String create_time; + String callback_time; + +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentWide.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentWide.java new file mode 100644 index 0000000..c226e4a --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentWide.java @@ -0,0 +1,95 @@ +package com.atguigu.gmall.realtime.beans; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.beanutils.BeanUtils; + +import java.lang.reflect.InvocationTargetException; +import java.math.BigDecimal; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.beans + *@Author: markilue + *@CreateTime: 2023-05-10 18:58 + *@Description: TODO 订单宽表实体类 + *@Version: 1.0 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class PaymentWide { + + Long payment_id; + String subject; + String payment_type; + String payment_create_time; + String callback_time; + Long detail_id; + Long order_id; + Long sku_id; + BigDecimal order_price; + Long sku_num; + String sku_name; + Long province_id; + String order_status; + Long user_id; + BigDecimal total_amount; + BigDecimal activity_reduce_amount; + BigDecimal coupon_reduce_amount; + BigDecimal original_total_amount; + BigDecimal feight_fee; + BigDecimal split_feight_fee; + BigDecimal split_activity_amount; + BigDecimal split_coupon_amount; + BigDecimal split_total_amount; + String order_create_time; + + String province_name;//查询维表得到 + String province_area_code; + String province_iso_code; + String province_3166_2_code; + Integer user_age; + String user_gender; + + Long spu_id; //作为维度数据 要关联进来 + Long tm_id; + Long category3_id; + String spu_name; + String tm_name; + String category3_name; + + public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide) { + mergeOrderWide(orderWide); + mergePaymentInfo(paymentInfo); + } + + public void mergePaymentInfo(PaymentInfo paymentInfo) { + if (paymentInfo != null) { + try { + BeanUtils.copyProperties(this, paymentInfo); + payment_create_time = paymentInfo.create_time; + payment_id = paymentInfo.id; + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.printStackTrace(); + } + } + } + + public void mergeOrderWide(OrderWide orderWide) { + if (orderWide != null) { + try { + BeanUtils.copyProperties(this, orderWide); + order_create_time = orderWide.create_time; + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.printStackTrace(); + } + } + } + +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java new file mode 100644 index 0000000..6d250f9 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java @@ -0,0 +1,43 @@ +package com.atguigu.gmall.realtime.beans; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.beans + *@Author: markilue + *@CreateTime: 2023-05-10 21:11 + *@Description: TODO 访客主题实体类 包含各个维度和度量 + *@Version: 1.0 + */ +@Data +@AllArgsConstructor +public class VisitorStats { + + //统计开始时间 + private String stt; + //统计结束时间 + private String edt; + //维度:版本 + private String vc; + //维度:渠道 + private String ch; + //维度:地区 + private String ar; + //维度:新老用户标识 + private String is_new; + //度量:独立访客数 + private Long uv_ct = 0L; + //度量:页面访问数 + private Long pv_ct = 0L; + //度量: 进入次数 + private Long sv_ct = 0L; + //度量: 跳出次数 + private Long uj_ct = 0L; + //度量: 持续访问时间 + private Long dur_sum = 0L; + //统计时间 + private Long ts; + +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DateTimeUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DateTimeUtils.java new file mode 100644 index 0000000..da15fe8 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DateTimeUtils.java @@ -0,0 +1,48 @@ +package com.atguigu.gmall.realtime.utils; + + +import com.ctc.wstx.osgi.WstxBundleActivator; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Date; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-10 19:49 + *@Description: TODO 日期转换工具类 + * SimpleDateFormat是线程不安全的 + * 在JDK1.8之后,使用线程安全的DateTimeFormatter类替换 + *@Version: 1.0 + */ +public class DateTimeUtils { + +// private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + //将日期对象转换为字符串 + public static String toYMDHMS(Date date) { + LocalDateTime localDateTime = LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()); + return dtf.format(localDateTime); + } + + //将字符串日期转换为时间毫秒数 + public static Long toTs(String dateStr) { + //SimpleDateFormat: Date Calendar + //DateTimeFormatter: LocalDateTime instant + LocalDateTime localDateTime = LocalDateTime.parse(dateStr, dtf); + Long ts = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(); + return ts; + } + + public static void main(String[] args) { + System.out.println(ZoneId.systemDefault()); + System.out.println(new Date(1683728230000L)); + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java index fe21dd7..f5a2ce7 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java @@ -114,10 +114,6 @@ public class DimUtils { return dimInfoJsonObj; } - public static void main(String[] args) { -// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13"))); - System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13")); - } //根据redis的key,删除缓存 public static void deleteCached(String tableName, String id) { @@ -132,4 +128,9 @@ public class DimUtils { } } + + public static void main(String[] args) { +// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13"))); + System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13")); + } } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ThreadPoolUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ThreadPoolUtils.java new file mode 100644 index 0000000..2ba0663 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ThreadPoolUtils.java @@ -0,0 +1,43 @@ +package com.atguigu.gmall.realtime.utils; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-10 14:59 + *@Description: TODO 线程池工具类 + *@Version: 1.0 + */ +public class ThreadPoolUtils { + + public static volatile ThreadPoolExecutor threadPool; + + public static final int corePoolSize = 4; + public static final int maximumPoolSize = 16; + public static final long keepAliveTime = 300; + public static final TimeUnit unit = TimeUnit.SECONDS; + public static final BlockingQueue workQueue = new LinkedBlockingQueue(Integer.MAX_VALUE); + + public static ThreadPoolExecutor getInstance() { + if (threadPool == null) { + synchronized (ThreadPoolExecutor.class) { + System.out.println("---开辟线程池---"); + if (threadPool == null) { + threadPool = new ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue + ); + } + } + } + return threadPool; + } +}