From e2d2f8925f53526aa699b2ee6a30c4421827ca90 Mon Sep 17 00:00:00 2001
From: markilue <745518019@qq.com>
Date: Wed, 10 May 2023 22:43:47 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=95=B0=E4=BB=93=E4=BB=BB?=
=?UTF-8?q?=E5=8A=A1dwm=E6=9B=B4=E6=96=B0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../rt-gmall-parent/gmall-realtime/pom.xml | 23 ++
.../atguigu/gmall/realtime/app/dwd/Base.java | 46 +++
.../gmall/realtime/app/dwm/OrderWideApp.java | 160 ++++++++-
.../realtime/app/dwm/PaymentWideApp.java | 130 ++++++++
.../realtime/app/dws/VisitorStatsApp.java | 304 ++++++++++++++++++
.../realtime/app/func/DimAsyncFunction.java | 79 +++++
.../realtime/app/func/DimJoinFunction.java | 22 ++
.../gmall/realtime/beans/OrderDetail.java | 6 +-
.../gmall/realtime/beans/PaymentInfo.java | 27 ++
.../gmall/realtime/beans/PaymentWide.java | 95 ++++++
.../gmall/realtime/beans/VisitorStats.java | 43 +++
.../gmall/realtime/utils/DateTimeUtils.java | 48 +++
.../gmall/realtime/utils/DimUtils.java | 9 +-
.../gmall/realtime/utils/ThreadPoolUtils.java | 43 +++
14 files changed, 1025 insertions(+), 10 deletions(-)
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/Base.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimAsyncFunction.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimJoinFunction.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentInfo.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentWide.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DateTimeUtils.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ThreadPoolUtils.java
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
index f93ab7e..06f2685 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
@@ -141,6 +141,29 @@
3.3.0
+
+
+ ru.yandex.clickhouse
+ clickhouse-jdbc
+ 0.3.0
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+
+ org.apache.flink
+ flink-connector-jdbc_${scala.version}
+ ${flink.version}
+
+
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/Base.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/Base.java
new file mode 100644
index 0000000..013b447
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/Base.java
@@ -0,0 +1,46 @@
+package com.atguigu.gmall.realtime.app.dwd;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dwd
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 20:06
+ *@Description: TODO 抽取基类:模板方法设计模式
+ *@Version: 1.0
+ */
+public abstract class Base {
+
+ public void entry() throws Exception {
+ //TODO 1.基本环境准确
+ //1.1 流处理环境
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ //1.2 设置并行度
+ env.setParallelism(4);
+
+ //TODO 2.检查点设置
+ //2.1 开启检查点
+ env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
+ //2.2 设置检查点超时时间
+ env.getCheckpointConfig().setCheckpointTimeout(60000L);
+ //2.3 设置重启策略
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
+ //2.4 设置job取消后,检查点是否保存
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ //2.5 设置状态后端 内存/文件系统/RocksDB
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
+ //2.6 指定操作HDFS用户
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+ execute(env);
+
+ env.execute();
+ }
+
+ public abstract void execute(StreamExecutionEnvironment environment);
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
index 2635d5c..b52af13 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
@@ -1,10 +1,13 @@
package com.atguigu.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
import com.atguigu.gmall.realtime.beans.OrderDetail;
import com.atguigu.gmall.realtime.beans.OrderInfo;
import com.atguigu.gmall.realtime.beans.OrderWide;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.avro.Schema;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
@@ -13,17 +16,23 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
/**
*@BelongsProject: rt-gmall-parent
@@ -31,6 +40,24 @@ import java.time.Duration;
*@Author: markilue
*@CreateTime: 2023-05-09 14:33
*@Description: TODO 订单宽表准备
+ * -- 需要启动的进程:
+ * zk,hdfs,hbase,kafka,maxwell,,redis
+ * BaseDBApp,OrderWideApp
+ * --执行流程:
+ * >运行模拟生成业务数据的jar
+ * >会想业务数据库Mysql中插入生成业务数据
+ * >mysql会将变化的数据放到binlog中
+ * >maxwell从binlog中获取数据,将位数封装为json字符串发送到kafka的ods主题中 ods_base_db_m
+ * >BaseLogApp从ods_base_db_m主题中读取数据,进行分流
+ * &事实数据----写回到kafka的dwd主题
+ * &业务数据----保存到phoenix的维度表中
+ * >OrderWideApp从dwd主题中获取订单和订单明细数据
+ * >使用intervalJoin对订单和订单明细进行双流join
+ * >将用户维度关联到订单宽表上
+ * *基本的维度关联
+ * *优化1:旁路缓存
+ * *优化2:异步IO
+ *
*@Version: 1.0
*/
public class OrderWideApp {
@@ -146,12 +173,139 @@ public class OrderWideApp {
}
);
- orderWideDS.print(">>>");
+// orderWideDS.print(">>>");
+ //TODO 8.和用户维度进行关联 ->异步操作进行关联
+ SingleOutputStreamOperator orderWideWithUserDS = AsyncDataStream.unorderedWait(
+ orderWideDS,
+ new DimAsyncFunction("DIM_USER_INFO") {
+ @Override
+ public void join(OrderWide orderWide, JSONObject dimJsonObj) throws ParseException {
+ String gender = dimJsonObj.getString("GENDER");
+ //计算年龄
+ String birthday = dimJsonObj.getString("BIRTHDAY");//1972-06-15
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ Date birthdayDate = sdf.parse(birthday);
+ long betweenMs = System.currentTimeMillis() - birthdayDate.getTime();
+ long ageLong = betweenMs / 1000L / 60L / 60L / 24L / 365L;
+ int age = (int) ageLong;
+
+ //维度关联
+ orderWide.setUser_gender(gender);
+ orderWide.setUser_age(age);
+ }
+
+ @Override
+ public String getKey(OrderWide orderWide) {
+ return orderWide.getUser_id().toString();
+ }
+ }
+ ,
+ 60,
+ TimeUnit.SECONDS
+ );
+
+ //TODO 9.和地区纬度进行关联
+ SingleOutputStreamOperator orderWideWithProvinceDS = AsyncDataStream.unorderedWait(
+ orderWideWithUserDS,
+ new DimAsyncFunction("DIM_BASE_PROVINCE") {
+ @Override
+ public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception {
+ orderWide.setProvince_3166_2_code(dimJsonObj.getString("NAME"));
+ orderWide.setProvince_area_code(dimJsonObj.getString("ISO_3166_2"));
+ orderWide.setProvince_name(dimJsonObj.getString("AREA_CODE"));
+ orderWide.setProvince_iso_code(dimJsonObj.getString("ISO_CODE"));
+ }
+
+ @Override
+ public String getKey(OrderWide input) {
+ return input.getProvince_id().toString();
+ }
+ },
+ 60, TimeUnit.SECONDS
+ );
+
+ //TODO 10.和SKU维度进行关联
+ SingleOutputStreamOperator orderWideWithSkuDS = AsyncDataStream.unorderedWait(
+ orderWideWithProvinceDS,
+ new DimAsyncFunction("DIM_SKU_INFO") {
+ @Override
+ public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
+ orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
+ orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
+ orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
+ orderWide.setTm_id(jsonObject.getLong("TM_ID"));
+ }
+
+ @Override
+ public String getKey(OrderWide orderWide) {
+ return String.valueOf(orderWide.getSku_id());
+ }
+ }, 60, TimeUnit.SECONDS);
+
+
+ //TODO 11.和SPU维度进行关联
+ SingleOutputStreamOperator orderWideWithSpuDS = AsyncDataStream.unorderedWait(
+ orderWideWithSkuDS,
+ new DimAsyncFunction("DIM_SPU_INFO") {
+ @Override
+ public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
+ orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
+ }
+
+ @Override
+ public String getKey(OrderWide orderWide) {
+ return String.valueOf(orderWide.getSpu_id());
+ }
+ }, 60, TimeUnit.SECONDS);
+
+
+ //TODO 12.和类别度进行关联
+
+ SingleOutputStreamOperator orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
+ orderWideWithSpuDS,
+ new DimAsyncFunction("DIM_BASE_CATEGORY3") {
+ @Override
+ public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
+ orderWide.setCategory3_name(jsonObject.getString("NAME"));
+ }
+
+ @Override
+ public String getKey(OrderWide orderWide) {
+ return String.valueOf(orderWide.getCategory3_id());
+ }
+ }, 60, TimeUnit.SECONDS);
+
+
+ //TODO 13.和品牌维度进行关联
+ SingleOutputStreamOperator orderWideWithTmDS = AsyncDataStream.unorderedWait(
+ orderWideWithCategory3DS,
+ new DimAsyncFunction("DIM_BASE_TRADEMARK") {
+ @Override
+ public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
+ orderWide.setTm_name(jsonObject.getString("TM_NAME"));
+ }
+
+ @Override
+ public String getKey(OrderWide orderWide) {
+ return String.valueOf(orderWide.getTm_id());
+ }
+ }, 60, TimeUnit.SECONDS);
+
+
+ orderWideWithTmDS.print(">>>>>");
+// orderWideWithTmDS.print(">>>>>");
+
+ //TODO 14.将订单宽表数据写回kafka的dwm_order_wide
+ //JSON.parseObject(jsonStr): 将字符串转换为json对象
+ //JSON.parseObject(jsonStr,类型): 将字符串转换为对应类型的对象
+ //JSON.toJSONString(对象): 将对象转换为json格式字符串
+ orderWideWithTmDS.map(JSON::toJSONString).addSink(
+ MyKafkaUtils.getKafkaSink("dwm_order_wide")
+ );
env.execute();
-
-
}
}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java
new file mode 100644
index 0000000..fd7240a
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java
@@ -0,0 +1,130 @@
+package com.atguigu.gmall.realtime.app.dwm;
+
+import com.alibaba.fastjson.JSON;
+import com.atguigu.gmall.realtime.beans.OrderWide;
+import com.atguigu.gmall.realtime.beans.PaymentInfo;
+import com.atguigu.gmall.realtime.beans.PaymentWide;
+import com.atguigu.gmall.realtime.utils.DateTimeUtils;
+import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 19:03
+ *@Description: TODO 支付宽表准备
+ *@Version: 1.0
+ */
+public class PaymentWideApp {
+
+ public static void main(String[] args) throws Exception {
+ //TODO 1.创建流环境
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ //TODO 2.检查点配置
+ env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.getCheckpointConfig().setCheckpointTimeout(60000L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202/rt_gmall/gmall"));
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+ //TODO 3.从kafka中读取数据(订单宽表和dwd_payment_info)
+ String paymentTopic = "dwd_payment_info";
+ String orderWideTopic = "dwm_order_wide";
+ String groupId = "payment_wide_app_group";
+ DataStreamSource paymentStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(paymentTopic, groupId)
+ );
+ DataStreamSource orderWideStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(orderWideTopic, groupId)
+ );
+
+ //TODO 4.数据格式进行转换转为paymentInfo,orderWide对象
+ SingleOutputStreamOperator paymentDS = paymentStrDS.map(
+ new MapFunction() {
+ @Override
+ public PaymentInfo map(String s) throws Exception {
+ return JSON.parseObject(s, PaymentInfo.class);
+ }
+ }
+ );
+ SingleOutputStreamOperator orderWideDS = orderWideStrDS.map(
+ orderWideStr -> JSON.parseObject(orderWideStr, OrderWide.class)
+ );
+
+// paymentDS.print(">>>>");
+// orderWideDS.print("$$$$");
+
+ //TODO 5.注册水位线
+ SingleOutputStreamOperator paymentInfoWithWatermarkDS = paymentDS.assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
+ .withTimestampAssigner(
+ new SerializableTimestampAssigner() {
+ @Override
+ public long extractTimestamp(PaymentInfo paymentInfo, long l) {
+ return DateTimeUtils.toTs(paymentInfo.getCallback_time());
+ }
+ }
+ )
+ );
+
+ SingleOutputStreamOperator orderWideWithWatermarkDS = orderWideDS.assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
+ .withTimestampAssigner(
+ new SerializableTimestampAssigner() {
+ @Override
+ public long extractTimestamp(OrderWide orderWide, long l) {
+ return DateTimeUtils.toTs(orderWide.getCreate_date());
+ }
+ }
+ )
+ );
+
+ //TODO 6.分组,创建连接字段
+ KeyedStream paymentInfoKeyedDS = paymentInfoWithWatermarkDS.keyBy(PaymentInfo::getOrder_id);
+ KeyedStream orderWideKeyedDS = orderWideWithWatermarkDS.keyBy(OrderWide::getOrder_id);
+
+ //TODO 7.双流join
+ SingleOutputStreamOperator paymentWideDS = paymentInfoKeyedDS
+ .intervalJoin(orderWideKeyedDS)
+ .between(Time.seconds(-1800), Time.seconds(0))//往前半小时
+ .process(
+ new ProcessJoinFunction() {
+ @Override
+ public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, ProcessJoinFunction.Context context, Collector collector) throws Exception {
+ collector.collect(new PaymentWide(paymentInfo, orderWide));
+ }
+ }
+ );
+
+ paymentWideDS.print(">>>>");
+
+ //TODO 8.将支付宽表数据写到kafka的dwm_payment_wide
+ paymentWideDS
+ .map(paymentWide -> JSON.toJSONString(paymentDS))
+ .addSink(
+ MyKafkaUtils.getKafkaSink("dwm_payment_wide")
+ );
+
+
+ env.execute();
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java
new file mode 100644
index 0000000..b693e47
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java
@@ -0,0 +1,304 @@
+package com.atguigu.gmall.realtime.app.dws;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.beans.VisitorStats;
+import com.atguigu.gmall.realtime.utils.DateTimeUtils;
+import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcSink;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.Date;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dws
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 21:04
+ *@Description: TODO 访客主题统计DWS
+ *@Version: 1.0
+ */
+public class VisitorStatsApp {
+
+ public static void main(String[] args) throws Exception {
+ //TODO 1.流环境建立
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ //TODO 2.检查点设置
+ env.enableCheckpointing(5000L);
+ env.getCheckpointConfig().setCheckpointTimeout(3000L);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+
+ //TODO 3.从kafka中读取数据
+ String groupId = "visitor_stats_app_group";
+ String pageLogTopic = "dwd_page_log";
+ DataStreamSource pageLogStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(pageLogTopic, groupId)
+ );
+
+ String uniqueVisitTopic = "dwm_unique_visitor";
+ DataStreamSource uniqueVisitStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(uniqueVisitTopic, groupId)
+ );
+
+ String userJumpDetailDetailTopic = "dwm_user_jump_detail";
+ DataStreamSource userJumpDetailDetailStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(userJumpDetailDetailTopic, groupId)
+ );
+
+// pageLogStrDS.print(">>>>");
+// uniqueVisitStrDS.print("$$$$");
+// userJumpDetailDetailStrDS.print("&&&&");
+
+ //TODO 4.对流中数据进行格式转换 封装流 string ->VisitorStats
+
+ //4.1 dwd_page_log流中的数据的转换
+ SingleOutputStreamOperator pvStateDS = pageLogStrDS.map(
+ /*
+ {
+ "common": {
+ "ar": "440000",
+ "uid": "9",
+ "os": "Android 11.0",
+ "ch": "vivo",
+ "is_new": "1",
+ "md": "Xiaomi 9",
+ "mid": "mid_10",
+ "vc": "v2.1.132",
+ "ba": "Xiaomi"
+ },
+ "page": {
+ "page_id": "cart",
+ "during_time": 2832,
+ "last_page_id": "good_detail"
+ },
+ "ts": 1683724788000
+ }
+ */
+ new MapFunction() {
+ @Override
+ public VisitorStats map(String s) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(s);
+ JSONObject commonJsonObj = jsonObject.getJSONObject("common");
+ JSONObject pageJsonObj = jsonObject.getJSONObject("page");
+ VisitorStats visitorStats = new VisitorStats(
+ "",
+ "",
+ commonJsonObj.getString("vc"),
+ commonJsonObj.getString("ch"),
+ commonJsonObj.getString("ar"),
+ commonJsonObj.getString("is_new"),
+ 0L,
+ 1L,
+ 0L,
+ 0L,
+ pageJsonObj.getLong("during_time"),
+ jsonObject.getLong("ts")
+ );
+ //判断是否为新的会话
+ String lastPageId = pageJsonObj.getString("last_page_id");
+ if (lastPageId == null || lastPageId.length() == 0) {
+ visitorStats.setSv_ct(1L);
+ }
+
+ return visitorStats;
+ }
+ }
+ );
+
+ //4.2 dwm_unique_visitor流中的数据的转换
+ SingleOutputStreamOperator uvStateDS = uniqueVisitStrDS.map(
+ /*
+ {
+ "common": {
+ "ar": "440000",
+ "uid": "9",
+ "os": "Android 11.0",
+ "ch": "vivo",
+ "is_new": "1",
+ "md": "Xiaomi 9",
+ "mid": "mid_10",
+ "vc": "v2.1.132",
+ "ba": "Xiaomi"
+ },
+ "page": {
+ "page_id": "cart",
+ "during_time": 2832,
+ "last_page_id": "good_detail"
+ },
+ "ts": 1683724788000
+ }
+ */
+ new MapFunction() {
+ @Override
+ public VisitorStats map(String s) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(s);
+ JSONObject commonJsonObj = jsonObject.getJSONObject("common");
+ JSONObject pageJsonObj = jsonObject.getJSONObject("page");
+ VisitorStats visitorStats = new VisitorStats(
+ "",
+ "",
+ commonJsonObj.getString("vc"),
+ commonJsonObj.getString("ch"),
+ commonJsonObj.getString("ar"),
+ commonJsonObj.getString("is_new"),
+ 1L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ jsonObject.getLong("ts")
+ );
+
+
+ return visitorStats;
+ }
+ }
+ );
+
+ //4.3 dwm_user_jump_detail流中的数据的转换
+ SingleOutputStreamOperator ujdStateDS = userJumpDetailDetailStrDS.map(
+ new MapFunction() {
+ @Override
+ public VisitorStats map(String s) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(s);
+ JSONObject commonJsonObj = jsonObject.getJSONObject("common");
+ JSONObject pageJsonObj = jsonObject.getJSONObject("page");
+ VisitorStats visitorStats = new VisitorStats(
+ "",
+ "",
+ commonJsonObj.getString("vc"),
+ commonJsonObj.getString("ch"),
+ commonJsonObj.getString("ar"),
+ commonJsonObj.getString("is_new"),
+ 0L,
+ 0L,
+ 0L,
+ 1L,
+ 0L,
+ jsonObject.getLong("ts")
+ );
+ //判断是否为新的会话
+ String lastPageId = pageJsonObj.getString("last_page_id");
+ if (lastPageId == null || lastPageId.length() == 0) {
+ visitorStats.setSv_ct(1L);
+ }
+
+ return visitorStats;
+ }
+ }
+ );
+
+ //TODO 5.将三条刘转换后的数据进行合并
+ DataStream unionDS = pvStateDS.union(uvStateDS, ujdStateDS);
+
+// unionDS.print(">>>>>>");
+
+
+ //TODO 6.指定watermark以及提取事件时间字段
+ SingleOutputStreamOperator visitorStatWithWatermarkDS = unionDS
+ .assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
+ .withTimestampAssigner(
+ new SerializableTimestampAssigner() {
+ @Override
+ public long extractTimestamp(VisitorStats visitorStats, long l) {
+ return visitorStats.getTs();
+ }
+ }
+ )
+ );
+
+ //TODO 7.按照维度对流中的数据进行分组 维度有:版本,渠道,地区,新老访客 所以我们定义分组的key为Tuple4类型
+ KeyedStream> keyedDS = visitorStatWithWatermarkDS.keyBy(
+ new KeySelector>() {
+ @Override
+ public Tuple4 getKey(VisitorStats visitorStats) throws Exception {
+ return Tuple4.of(
+ visitorStats.getVc(),
+ visitorStats.getCh(),
+ visitorStats.getAr(),
+ visitorStats.getIs_new());
+ }
+ }
+ );
+
+ //TODO 8.开窗 对分组有的数据 进行开窗计算 每个分组 独立窗口 ,分组之间互不影响
+ WindowedStream, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
+
+ //TODO 9.聚合计算 对窗口中的数据进行聚合计算
+ SingleOutputStreamOperator reduceDS = windowDS.reduce(
+ new ReduceFunction() {
+ @Override
+ public VisitorStats reduce(VisitorStats visitorStats1, VisitorStats visitorStats2) throws Exception {
+ //度量值两两聚合
+ visitorStats1.setPv_ct(visitorStats1.getPv_ct() + visitorStats2.getPv_ct());
+ visitorStats1.setUv_ct(visitorStats1.getUv_ct() + visitorStats2.getUv_ct());
+ visitorStats1.setUj_ct(visitorStats1.getUj_ct() + visitorStats2.getUj_ct());
+ visitorStats1.setSv_ct(visitorStats1.getSv_ct() + visitorStats2.getSv_ct());
+ visitorStats1.setDur_sum(visitorStats1.getDur_sum() + visitorStats2.getDur_sum());
+ return visitorStats1;
+ }
+ },
+ new ProcessWindowFunction, TimeWindow>() {
+ @Override
+ public void process(Tuple4 tuple4, ProcessWindowFunction, TimeWindow>.Context context, Iterable iterable, Collector collector) throws Exception {
+ //补全时间字段
+ long start = context.window().getStart();
+ long end = context.window().getEnd();
+ for (VisitorStats visitorStats : iterable) {
+ visitorStats.setStt(DateTimeUtils.toYMDHMS(new Date(start)));
+ visitorStats.setEdt(DateTimeUtils.toYMDHMS(new Date(end)));
+ visitorStats.setTs(System.currentTimeMillis());
+ //将处理之后的数据发送到下游
+ collector.collect(visitorStats);
+ }
+ }
+ }
+ );
+ reduceDS.print(">>>>>");
+
+ //TODO 10.将聚合统计之后的数据写到clickhouse
+ DataStreamSink visitorStatsDataStreamSink = reduceDS.addSink(
+ JdbcSink.sink(
+ "",
+ new JdbcStatementBuilder() {
+ @Override
+ public void accept(PreparedStatement preparedStatement, VisitorStats visitorStats) throws SQLException {
+
+ }
+ },
+ new JdbcExecutionOptions.Builder().build(),
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().build()
+ )
+ );
+
+ env.execute();
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimAsyncFunction.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimAsyncFunction.java
new file mode 100644
index 0000000..872c4f1
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimAsyncFunction.java
@@ -0,0 +1,79 @@
+package com.atguigu.gmall.realtime.app.func;
+
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.utils.DimUtils;
+import com.atguigu.gmall.realtime.utils.ThreadPoolUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.func
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 15:17
+ *@Description: TODO 维度异步关联
+ * 模板方法设计模式:
+ * 在父类中定义实现某一个功能的核心算法骨架,将具体的实现延迟的子类中实现
+ * 子类在不改变父类核心算法骨架的前提下,每一个子类都可以有自己的实现
+ *@Version: 1.0
+ */
+public abstract class DimAsyncFunction extends RichAsyncFunction implements DimJoinFunction {
+
+ ExecutorService executorService;
+ private String tableName;
+
+ public DimAsyncFunction(String tableName) {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ //创建线程池对象
+ executorService = ThreadPoolUtils.getInstance();
+ }
+
+ //发送异步请求,完成维度关联
+ //通过创建多线程的方式 发送异步请求
+ //asyncInvoke每处理一条流中的数据 都会执行一次
+ @Override
+ public void asyncInvoke(T input, ResultFuture resultFuture) throws Exception {
+ //通过线程池对象获取线程
+ executorService.submit(
+ new Runnable() {
+ //TODO 在run中的代码就是异步维度关联的操作
+ @Override
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+
+ //从对象获取维度关联的key
+ String key = getKey(input);
+ //根据key到维度表中获取维度对象
+ JSONObject dimJsonObj = DimUtils.getDimInfoWithCache(tableName, key);
+ //把维度对象的属性赋值给流中对象属性(维度关联)
+ if (dimJsonObj != null) {
+ join(input, dimJsonObj);
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("维度异步查询耗时:" + (end - start) + "毫秒");
+
+ resultFuture.complete(Collections.singleton(input));//将input转换为一个集合向下游传递
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("维度异步查询发生异常");
+ }
+ }
+ }
+ );
+
+ }
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimJoinFunction.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimJoinFunction.java
new file mode 100644
index 0000000..f071d4a
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimJoinFunction.java
@@ -0,0 +1,22 @@
+package com.atguigu.gmall.realtime.app.func;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.text.ParseException;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.func
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 16:24
+ *@Description: TODO
+ *@Version: 1.0
+ */
+public interface DimJoinFunction {
+
+ //怎么进行关联
+ void join(T input, JSONObject dimJsonObj) throws Exception;
+
+ //关联的id
+ String getKey(T input);
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java
index 9d27442..613a0c3 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java
@@ -16,10 +16,10 @@ import java.math.BigDecimal;
public class OrderDetail {
Long id;
- Long order_id ;
+ Long order_id;
Long sku_id;
- BigDecimal order_price ;
- Long sku_num ;
+ BigDecimal order_price;
+ Long sku_num;
String sku_name;
String create_time;
BigDecimal split_total_amount;
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentInfo.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentInfo.java
new file mode 100644
index 0000000..e3508a9
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentInfo.java
@@ -0,0 +1,27 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 18:57
+ *@Description: TODO 订单信息实体类
+ *@Version: 1.0
+ */
+@Data
+public class PaymentInfo {
+
+ Long id;
+ Long order_id;
+ Long user_id;
+ BigDecimal total_amount;
+ String subject;
+ String payment_type;
+ String create_time;
+ String callback_time;
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentWide.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentWide.java
new file mode 100644
index 0000000..c226e4a
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/PaymentWide.java
@@ -0,0 +1,95 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigDecimal;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 18:58
+ *@Description: TODO 订单宽表实体类
+ *@Version: 1.0
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class PaymentWide {
+
+ Long payment_id;
+ String subject;
+ String payment_type;
+ String payment_create_time;
+ String callback_time;
+ Long detail_id;
+ Long order_id;
+ Long sku_id;
+ BigDecimal order_price;
+ Long sku_num;
+ String sku_name;
+ Long province_id;
+ String order_status;
+ Long user_id;
+ BigDecimal total_amount;
+ BigDecimal activity_reduce_amount;
+ BigDecimal coupon_reduce_amount;
+ BigDecimal original_total_amount;
+ BigDecimal feight_fee;
+ BigDecimal split_feight_fee;
+ BigDecimal split_activity_amount;
+ BigDecimal split_coupon_amount;
+ BigDecimal split_total_amount;
+ String order_create_time;
+
+ String province_name;//查询维表得到
+ String province_area_code;
+ String province_iso_code;
+ String province_3166_2_code;
+ Integer user_age;
+ String user_gender;
+
+ Long spu_id; //作为维度数据 要关联进来
+ Long tm_id;
+ Long category3_id;
+ String spu_name;
+ String tm_name;
+ String category3_name;
+
+ public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide) {
+ mergeOrderWide(orderWide);
+ mergePaymentInfo(paymentInfo);
+ }
+
+ public void mergePaymentInfo(PaymentInfo paymentInfo) {
+ if (paymentInfo != null) {
+ try {
+ BeanUtils.copyProperties(this, paymentInfo);
+ payment_create_time = paymentInfo.create_time;
+ payment_id = paymentInfo.id;
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void mergeOrderWide(OrderWide orderWide) {
+ if (orderWide != null) {
+ try {
+ BeanUtils.copyProperties(this, orderWide);
+ order_create_time = orderWide.create_time;
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java
new file mode 100644
index 0000000..6d250f9
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java
@@ -0,0 +1,43 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 21:11
+ *@Description: TODO 访客主题实体类 包含各个维度和度量
+ *@Version: 1.0
+ */
+@Data
+@AllArgsConstructor
+public class VisitorStats {
+
+ //统计开始时间
+ private String stt;
+ //统计结束时间
+ private String edt;
+ //维度:版本
+ private String vc;
+ //维度:渠道
+ private String ch;
+ //维度:地区
+ private String ar;
+ //维度:新老用户标识
+ private String is_new;
+ //度量:独立访客数
+ private Long uv_ct = 0L;
+ //度量:页面访问数
+ private Long pv_ct = 0L;
+ //度量: 进入次数
+ private Long sv_ct = 0L;
+ //度量: 跳出次数
+ private Long uj_ct = 0L;
+ //度量: 持续访问时间
+ private Long dur_sum = 0L;
+ //统计时间
+ private Long ts;
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DateTimeUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DateTimeUtils.java
new file mode 100644
index 0000000..da15fe8
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DateTimeUtils.java
@@ -0,0 +1,48 @@
+package com.atguigu.gmall.realtime.utils;
+
+
+import com.ctc.wstx.osgi.WstxBundleActivator;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.utils
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 19:49
+ *@Description: TODO 日期转换工具类
+ * SimpleDateFormat是线程不安全的
+ * 在JDK1.8之后,使用线程安全的DateTimeFormatter类替换
+ *@Version: 1.0
+ */
+public class DateTimeUtils {
+
+// private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ //将日期对象转换为字符串
+ public static String toYMDHMS(Date date) {
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
+ return dtf.format(localDateTime);
+ }
+
+ //将字符串日期转换为时间毫秒数
+ public static Long toTs(String dateStr) {
+ //SimpleDateFormat: Date Calendar
+ //DateTimeFormatter: LocalDateTime instant
+ LocalDateTime localDateTime = LocalDateTime.parse(dateStr, dtf);
+ Long ts = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
+ return ts;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(ZoneId.systemDefault());
+ System.out.println(new Date(1683728230000L));
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java
index fe21dd7..f5a2ce7 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java
@@ -114,10 +114,6 @@ public class DimUtils {
return dimInfoJsonObj;
}
- public static void main(String[] args) {
-// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13")));
- System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13"));
- }
//根据redis的key,删除缓存
public static void deleteCached(String tableName, String id) {
@@ -132,4 +128,9 @@ public class DimUtils {
}
}
+
+ public static void main(String[] args) {
+// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13")));
+ System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13"));
+ }
}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ThreadPoolUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ThreadPoolUtils.java
new file mode 100644
index 0000000..2ba0663
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ThreadPoolUtils.java
@@ -0,0 +1,43 @@
+package com.atguigu.gmall.realtime.utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.utils
+ *@Author: markilue
+ *@CreateTime: 2023-05-10 14:59
+ *@Description: TODO 线程池工具类
+ *@Version: 1.0
+ */
+public class ThreadPoolUtils {
+
+ public static volatile ThreadPoolExecutor threadPool;
+
+ public static final int corePoolSize = 4;
+ public static final int maximumPoolSize = 16;
+ public static final long keepAliveTime = 300;
+ public static final TimeUnit unit = TimeUnit.SECONDS;
+ public static final BlockingQueue workQueue = new LinkedBlockingQueue(Integer.MAX_VALUE);
+
+ public static ThreadPoolExecutor getInstance() {
+ if (threadPool == null) {
+ synchronized (ThreadPoolExecutor.class) {
+ System.out.println("---开辟线程池---");
+ if (threadPool == null) {
+ threadPool = new ThreadPoolExecutor(
+ corePoolSize,
+ maximumPoolSize,
+ keepAliveTime,
+ unit,
+ workQueue
+ );
+ }
+ }
+ }
+ return threadPool;
+ }
+}