From 015e83793d35d0e5e19ef0b83b1d96dec156490a Mon Sep 17 00:00:00 2001
From: markilue <745518019@qq.com>
Date: Thu, 11 May 2023 22:05:30 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=95=B0=E4=BB=93=E4=BB=BB?=
=?UTF-8?q?=E5=8A=A1dws=E6=9B=B4=E6=96=B0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../rt-gmall-parent/gmall-realtime/pom.xml | 20 +
.../atguigu/gmall/realtime/app/dwd/log.json | 56 ++-
.../gmall/realtime/app/dwm/OrderWideApp.java | 3 +-
.../realtime/app/dwm/PaymentWideApp.java | 4 +-
.../realtime/app/dws/ProductStatsApp.java | 426 ++++++++++++++++++
.../realtime/app/dws/ProvinceStatsApp.java | 102 +++++
.../realtime/app/dws/VisitorStatsApp.java | 17 +-
.../gmall/realtime/app/dws/tablesql.sql | 39 ++
.../gmall/realtime/beans/GmallConstant.java | 79 ++++
.../gmall/realtime/beans/ProductStats.java | 94 ++++
.../gmall/realtime/beans/ProvinceStats.java | 47 ++
.../gmall/realtime/beans/TransientSink.java | 19 +
.../gmall/realtime/beans/VisitorStats.java | 2 +-
.../gmall/realtime/common/GmallConfig.java | 1 +
.../gmall/realtime/utils/ClickhouseUtils.java | 72 +++
.../gmall/realtime/utils/KeywordUtils.java | 49 ++
.../gmall/realtime/utils/MyKafkaUtils.java | 12 +
17 files changed, 1025 insertions(+), 17 deletions(-)
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProductStatsApp.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProvinceStatsApp.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/GmallConstant.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProductStats.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProvinceStats.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/TransientSink.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ClickhouseUtils.java
create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
index 06f2685..b961a04 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
@@ -164,6 +164,26 @@
${flink.version}
+
+
+ org.apache.flink
+ flink-table-api-java-bridge_${scala.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-table-planner-blink_${scala.version}
+ ${flink.version}
+
+
+
+
+ com.janeluo
+ ikanalyzer
+ 2012_u6
+
+
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json
index 509b9d4..46e6c2b 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwd/log.json
@@ -232,7 +232,61 @@
"ts": 1683293507
}
-
+{
+ "page": {
+ "page_id": "good_detail",
+ "item": "7",
+ "during_time": 14474,
+ "item_type": "sku_id",
+ "last_page_id": "good_list",
+ "source_type": "promotion"
+ },
+ "displays": [
+ {
+ "display_type": "query",
+ "item": "8",
+ "item_type": "sku_id",
+ "pos_id": 2,
+ "order": 1
+ },
+ {
+ "display_type": "promotion",
+ "item": "2",
+ "item_type": "sku_id",
+ "pos_id": 1,
+ "order": 2
+ },
+ {
+ "display_type": "query",
+ "item": "2",
+ "item_type": "sku_id",
+ "pos_id": 3,
+ "order": 3
+ },
+ {
+ "display_type": "promotion",
+ "item": "2",
+ "item_type": "sku_id",
+ "pos_id": 3,
+ "order": 4
+ },
+ {
+ "display_type": "recommend",
+ "item": "7",
+ "item_type": "sku_id",
+ "pos_id": 4,
+ "order": 5
+ },
+ {
+ "display_type": "promotion",
+ "item": "3",
+ "item_type": "sku_id",
+ "pos_id": 5,
+ "order": 6
+ }
+ ],
+ "ts": 1683788701000
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
index b52af13..f2ff01d 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
@@ -200,8 +200,7 @@ public class OrderWideApp {
public String getKey(OrderWide orderWide) {
return orderWide.getUser_id().toString();
}
- }
- ,
+ },
60,
TimeUnit.SECONDS
);
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java
index fd7240a..256017b 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/PaymentWideApp.java
@@ -92,7 +92,7 @@ public class PaymentWideApp {
new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(OrderWide orderWide, long l) {
- return DateTimeUtils.toTs(orderWide.getCreate_date());
+ return DateTimeUtils.toTs(orderWide.getCreate_time());
}
}
)
@@ -119,7 +119,7 @@ public class PaymentWideApp {
//TODO 8.将支付宽表数据写到kafka的dwm_payment_wide
paymentWideDS
- .map(paymentWide -> JSON.toJSONString(paymentDS))
+ .map(paymentWide -> JSON.toJSONString(paymentWide))
.addSink(
MyKafkaUtils.getKafkaSink("dwm_payment_wide")
);
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProductStatsApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProductStatsApp.java
new file mode 100644
index 0000000..071acee
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProductStatsApp.java
@@ -0,0 +1,426 @@
+package com.atguigu.gmall.realtime.app.dws;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
+import com.atguigu.gmall.realtime.beans.GmallConstant;
+import com.atguigu.gmall.realtime.beans.OrderWide;
+import com.atguigu.gmall.realtime.beans.PaymentWide;
+import com.atguigu.gmall.realtime.beans.ProductStats;
+import com.atguigu.gmall.realtime.utils.ClickhouseUtils;
+import com.atguigu.gmall.realtime.utils.DateTimeUtils;
+import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dws
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 15:47
+ *@Description: TODO 商品主题统计DWS
+ *
+ *@Version: 1.0
+ */
+public class ProductStatsApp {
+
+ public static void main(String[] args) throws Exception {
+ //TODO 1.流环境建立
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ //TODO 2.检查点设置
+ env.enableCheckpointing(5000L);
+ env.getCheckpointConfig().setCheckpointTimeout(3000L);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+ //TODO 3.从kafka中读取数据(7个主题)
+
+ //3.1 声明消费者主题以及消费者组
+ String groupId = "product_stats_app";
+
+ String pageViewSourceTopic = "dwd_page_log";
+ String orderWideSourceTopic = "dwm_order_wide";
+ String paymentWideSourceTopic = "dwm_payment_wide";
+ String cartInfoSourceTopic = "dwd_cart_info";
+ String favorInfoSourceTopic = "dwd_favor_info";
+ String refundInfoSourceTopic = "dwd_order_refund_info";
+ String commentInfoSourceTopic = "dwd_comment_info";
+
+ //创建消费者对象,封装成流
+ DataStreamSource pageViewSourceStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(pageViewSourceTopic, groupId)
+ );
+ DataStreamSource cartInfoSourceStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(cartInfoSourceTopic, groupId)
+ );
+ DataStreamSource favorInfoSourceStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(favorInfoSourceTopic, groupId)
+ );
+ DataStreamSource refundInfoSourceStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(refundInfoSourceTopic, groupId)
+ );
+ DataStreamSource commentInfoSourceStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(commentInfoSourceTopic, groupId)
+ );
+ DataStreamSource orderWideSourceStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(orderWideSourceTopic, groupId)
+ );
+ DataStreamSource paymentWideSourceStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(paymentWideSourceTopic, groupId)
+ );
+
+ //TODO 4.对流中的数据进行类型转换 jsonStr ->ProductStats
+ //由于又有点击又有曝光,那么不合适在map里面做,map只能返回一个,这里使用process
+
+ //4.1 点击和曝光流转换
+ SingleOutputStreamOperator clickAndDisplayStatsDS = pageViewSourceStrDS.process(
+ /*
+ {
+ "page": {
+ "page_id": "good_detail",
+ "item": "7",
+ "during_time": 14474,
+ "item_type": "sku_id",
+ "last_page_id": "good_list",
+ "source_type": "promotion"
+ },
+ "displays": [
+ {
+ "display_type": "query",
+ "item": "8",
+ "item_type": "sku_id",
+ "pos_id": 2,
+ "order": 1
+ },
+ ],
+ "ts": 1683788701000
+ }
+ */
+ new ProcessFunction() {
+ @Override
+ public void processElement(String jsonStr, ProcessFunction.Context context, Collector collector) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(jsonStr);
+ Long ts = jsonObject.getLong("ts");
+ //判断是否为点击行为
+ JSONObject page = jsonObject.getJSONObject("page");
+ String pageId = page.getString("page_id");
+ if ("good_detail".equals(pageId)) {
+ //如果当前日志记录的页面是商品的详情页,那么认为这条日志 记录的是点击行为
+ Long skuId = page.getLong("item");
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(skuId)
+ .click_ct(1L)
+ .ts(ts)
+ .build();
+ collector.collect(productStats);
+ }
+ //判断是否为曝光
+ JSONArray displays = jsonObject.getJSONArray("displays");
+ if (displays != null && displays.size() > 0) {
+ //如果displays数组不为空,说明当前页面上有曝光行为,对所有的曝光进行遍历
+ for (int i = 0; i < displays.size(); i++) {
+ JSONObject displayJsonObj = displays.getJSONObject(i);
+ //曝光的是不是商品
+ if ("sku_id".equals(displayJsonObj.getString("item_type"))) {
+ Long itemId = displayJsonObj.getLong("item");
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(itemId)
+ .display_ct(1L)
+ .ts(ts)
+ .build();
+ collector.collect(productStats);
+ }
+ }
+ }
+
+ }
+ }
+ );
+
+ //4.2 转换收藏流数据
+ SingleOutputStreamOperator favorInfoStatsDS = favorInfoSourceStrDS.map(
+ new MapFunction() {
+
+ @Override
+ public ProductStats map(String jsonStr) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(jsonStr);
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(jsonObject.getLong("sku_id"))
+ .favor_ct(1L)
+ .ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
+ .build();
+ return productStats;
+ }
+ }
+ );
+
+ //4.3 转换加购流数据
+ SingleOutputStreamOperator cartInfoStatsDS = cartInfoSourceStrDS.map(
+ new MapFunction() {
+ @Override
+ public ProductStats map(String jsonStr) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(jsonStr);
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(jsonObject.getLong("sku_id"))
+ .cart_ct(1L)
+ .ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
+ .build();
+ return productStats;
+ }
+ }
+ );
+
+ //4.4 转换退款订单流数据(注意是订单数,不是商品数,所以需要进行一个去重)
+ SingleOutputStreamOperator refundInfoStatsDS = refundInfoSourceStrDS.map(
+ new MapFunction() {
+ @Override
+ public ProductStats map(String jsonStr) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(jsonStr);
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(jsonObject.getLong("sku_id"))
+ .refundOrderIdSet(new HashSet(Collections.singleton(jsonObject.getLong("order_id"))))
+ .refund_amount(jsonObject.getBigDecimal("refund_amount"))
+ .ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
+ .build();
+ return productStats;
+ }
+ }
+ );
+
+ //4.5 转换评价表数据
+ SingleOutputStreamOperator commentInfoStatsDS = commentInfoSourceStrDS.map(
+ new MapFunction() {
+ @Override
+ public ProductStats map(String jsonStr) throws Exception {
+ JSONObject jsonObject = JSON.parseObject(jsonStr);
+ Long goodCt = GmallConstant.APPRAISE_GOOD.equals(jsonObject.getString("appraise")) ? 1L : 0L;
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(jsonObject.getLong("sku_id"))
+ .comment_ct(1L)
+ .good_comment_ct(goodCt)
+ .ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
+ .build();
+ return productStats;
+ }
+ }
+ );
+
+ //4.6 转换订单宽表流数据
+ SingleOutputStreamOperator orderWideStatsDS = orderWideSourceStrDS.map(
+ new MapFunction() {
+ @Override
+ public ProductStats map(String jsonStr) throws Exception {
+ OrderWide orderWide = JSON.parseObject(jsonStr, OrderWide.class);
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(orderWide.getSku_id())
+ .order_sku_num(orderWide.getSku_num())
+ .order_amount(orderWide.getSplit_total_amount())
+ .ts(DateTimeUtils.toTs(orderWide.getCreate_time()))
+ .orderIdSet(new HashSet(Collections.singleton(orderWide.getOrder_id())))
+ .build();
+
+ return productStats;
+ }
+ }
+ );
+
+ //4.7 转换支付宽表流数据
+ SingleOutputStreamOperator paymentWideStatsDS = paymentWideSourceStrDS.map(
+ new MapFunction() {
+ @Override
+ public ProductStats map(String jsonStr) throws Exception {
+ PaymentWide paymentWide = JSON.parseObject(jsonStr, PaymentWide.class);
+ ProductStats productStats = ProductStats.builder()
+ .sku_id(paymentWide.getSku_id())
+ .payment_amount(paymentWide.getSplit_total_amount())
+ .paidOrderIdSet(new HashSet(Collections.singleton(paymentWide.getOrder_id())))
+ .ts(DateTimeUtils.toTs(paymentWide.getCallback_time()))
+ .build();
+
+ return productStats;
+ }
+ }
+ );
+
+
+ //TODO 5.将不同流的数据通过union合并到一起
+ DataStream unionDS = clickAndDisplayStatsDS.union(
+ favorInfoStatsDS,
+ cartInfoStatsDS,
+ refundInfoStatsDS,
+ commentInfoStatsDS,
+ orderWideStatsDS,
+ paymentWideStatsDS
+ );
+
+// unionDS.print(">>>>>>");
+
+ //TODO 6.指定watermark以及提取时间事件字段
+ SingleOutputStreamOperator productStatsWithWatermarkDS = unionDS.
+ assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
+ .withTimestampAssigner(
+ new SerializableTimestampAssigner() {
+ @Override
+ public long extractTimestamp(ProductStats productStats, long l) {
+// System.out.println(productStats);
+ return productStats.getTs();
+ }
+ }
+ )
+ );
+
+ //TODO 7.分组 注意:目前商品维度数据 处理订单和支付宽表能够获取 其他的流没有维度数据.所以这里使用sku_id进行分组
+ KeyedStream keyedDS = productStatsWithWatermarkDS.keyBy(ProductStats::getSku_id);
+
+ //TODO 8.开窗
+ WindowedStream windowDS = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
+
+ //TODO 9.聚合计算
+ SingleOutputStreamOperator reduceDS = windowDS.reduce(
+ new ReduceFunction() {
+ @Override
+ public ProductStats reduce(ProductStats productStats1, ProductStats productStats2) throws Exception {
+ productStats1.setDisplay_ct(productStats1.getDisplay_ct() + productStats2.getDisplay_ct());
+ productStats1.setClick_ct(productStats1.getClick_ct() + productStats2.getClick_ct());
+ productStats1.setCart_ct(productStats1.getCart_ct() + productStats2.getCart_ct());
+ productStats1.setFavor_ct(productStats1.getFavor_ct() + productStats2.getFavor_ct());
+ productStats1.setOrder_amount(productStats1.getOrder_amount().add(productStats2.getOrder_amount()));
+ productStats1.getOrderIdSet().addAll(productStats2.getOrderIdSet());
+ productStats1.setOrder_ct(productStats1.getOrderIdSet().size() + 0L);
+ productStats1.setOrder_sku_num(productStats1.getOrder_sku_num() + productStats2.getOrder_sku_num());
+ productStats1.setPayment_amount(productStats1.getPayment_amount().add(productStats2.getPayment_amount()));
+
+ productStats1.getRefundOrderIdSet().addAll(productStats1.getRefundOrderIdSet());
+ productStats1.setRefund_order_ct(productStats1.getRefundOrderIdSet().size() + 0L);
+ productStats1.setRefund_amount(productStats1.getRefund_amount().add(productStats2.getRefund_amount()));
+
+ productStats1.getPaidOrderIdSet().addAll(productStats1.getPaidOrderIdSet());
+ productStats1.setPaid_order_ct(productStats1.getPaidOrderIdSet().size() + 0L);
+
+ productStats1.setComment_ct(productStats1.getComment_ct() + productStats2.getComment_ct());
+ productStats1.setGood_comment_ct(productStats1.getGood_comment_ct() + productStats2.getGood_comment_ct());
+
+ return productStats1;
+ }
+ },
+ new ProcessWindowFunction() {
+ @Override
+ public void process(Long aLong, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
+ for (ProductStats productStats : iterable) {
+ long start = context.window().getStart();
+ long end = context.window().getEnd();
+ productStats.setStt(DateTimeUtils.toYMDHMS(new Date(start)));
+ productStats.setEdt(DateTimeUtils.toYMDHMS(new Date(end)));
+ productStats.setTs(System.currentTimeMillis());
+ collector.collect(productStats);
+ }
+ }
+ }
+ );
+
+ //TODO 10.补全商品维度信息
+ //10.3 补全SKU维度
+ SingleOutputStreamOperator productStatsWithSKUDS = AsyncDataStream.unorderedWait(
+ reduceDS,
+ new DimAsyncFunction("DIM_SKU_INFO") {
+ @Override
+ public void join(ProductStats input, JSONObject dimJsonObj) throws Exception {
+ input.setSku_name(dimJsonObj.getString("SKU_NAME"));
+ input.setSku_price(dimJsonObj.getBigDecimal("PRICE"));
+ input.setSpu_id(dimJsonObj.getLong("SPU_ID"));
+ input.setTm_id(dimJsonObj.getLong("TM_ID"));
+ input.setCategory3_id(dimJsonObj.getLong("CATEGORY3_ID"));
+ }
+
+ @Override
+ public String getKey(ProductStats input) {
+ return input.getSku_id().toString();
+ }
+ },
+ 60, TimeUnit.SECONDS
+ );
+
+ //10.2 补充SPU维度
+ SingleOutputStreamOperator productStatsWithSpuDS =
+ AsyncDataStream.unorderedWait(productStatsWithSKUDS,
+ new DimAsyncFunction("DIM_SPU_INFO") {
+ @Override
+ public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
+ productStats.setSpu_name(jsonObject.getString("SPU_NAME"));
+ }
+
+ @Override
+ public String getKey(ProductStats productStats) {
+ return String.valueOf(productStats.getSpu_id());
+ }
+ }, 60, TimeUnit.SECONDS);
+
+
+ //10.3 补充品类维度
+ SingleOutputStreamOperator productStatsWithCategory3DS =
+ AsyncDataStream.unorderedWait(productStatsWithSpuDS,
+ new DimAsyncFunction("DIM_BASE_CATEGORY3") {
+ @Override
+ public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
+ productStats.setCategory3_name(jsonObject.getString("NAME"));
+ }
+
+ @Override
+ public String getKey(ProductStats productStats) {
+ return String.valueOf(productStats.getCategory3_id());
+ }
+ }, 60, TimeUnit.SECONDS);
+
+ //10.4 补充品牌维度
+ SingleOutputStreamOperator productStatsWithTmDS =
+ AsyncDataStream.unorderedWait(productStatsWithCategory3DS,
+ new DimAsyncFunction("DIM_BASE_TRADEMARK") {
+ @Override
+ public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
+ productStats.setTm_name(jsonObject.getString("TM_NAME"));
+ }
+
+ @Override
+ public String getKey(ProductStats productStats) {
+ return String.valueOf(productStats.getTm_id());
+ }
+ }, 60, TimeUnit.SECONDS);
+
+
+ productStatsWithTmDS.print(">>>>>");
+ //TODO 11.将计算结果写到clickhouse
+ productStatsWithTmDS.addSink(
+ ClickhouseUtils.getJdbcSink("insert into product_stats values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
+ );
+
+
+ env.execute();
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProvinceStatsApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProvinceStatsApp.java
new file mode 100644
index 0000000..8a14963
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/ProvinceStatsApp.java
@@ -0,0 +1,102 @@
+package com.atguigu.gmall.realtime.app.dws;
+
+import com.atguigu.gmall.realtime.beans.ProvinceStats;
+import com.atguigu.gmall.realtime.utils.ClickhouseUtils;
+import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dws
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 19:51
+ *@Description: TODO 地区主题统计 --SQL
+ *@Version: 1.0
+ */
+public class ProvinceStatsApp {
+
+ public static void main(String[] args) throws Exception {
+
+ //TODO 1.环境准备
+ //1.1 流处理环境
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ //1.2 表执行环境
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();//设置流处理环境
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ //1.3 设置并行度
+ env.setParallelism(4);
+
+
+ //TODO 2.检查点相关设置
+ env.enableCheckpointing(5000L);
+ env.getCheckpointConfig().setCheckpointTimeout(5000L);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+ //TODO 3.从指定数据源(kafka)读取数据 转换为动态表
+ //从orderWide读取数据
+ String topic = "dwm_order_wide";
+ String groupId = "province_stats_app_group";
+ String createSQL = "CREATE TABLE order_wide (" +
+ " province_id BIGINT," +
+ " province_name STRING," +
+ " province_area_code STRING," +
+ " province_iso_code STRING," +
+ " province_3166_2_code STRING," +
+ " order_id STRING," +
+ " split_total_amount DOUBLE," +
+ " create_time STRING," +
+ " rowtime as TO_TIMESTAMP(create_time) ," +
+ " WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND " +
+ ") WITH (" + MyKafkaUtils.getKafkaDDL(topic, groupId) + ")";
+// System.out.println(createSQL);
+ tableEnv.executeSql(createSQL);
+
+ //TODO 4.进行分组 开窗 计算
+ String selectSQL = "select " +
+ " DATE_FORMAT(TUMBLE_START(rowtime,INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as stt," +
+ " DATE_FORMAT(TUMBLE_END(rowtime,INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as edt," +
+ " province_id," +
+ " province_name," +
+ " province_area_code area_code," +
+ " province_iso_code iso_code," +
+ " province_3166_2_code iso_3166_2," +
+ " count(distinct order_id) order_count," +
+ " sum(split_total_amount) order_amount," +
+ " UNIX_TIMESTAMP() * 1000 as ts" +
+ " from " +
+ " order_wide" +
+ " group by " +
+ " TUMBLE(rowtime,INTERVAL '10' SECOND)," +
+ " province_id," +
+ " province_name," +
+ " province_area_code," +
+ " province_iso_code," +
+ " province_3166_2_code";
+// System.out.println(selectSQL);
+ Table orderWideTable = tableEnv.sqlQuery(selectSQL);
+
+ //TODO 5.将动态表转化为流
+ DataStream provinceStatsDS = tableEnv.toAppendStream(orderWideTable, ProvinceStats.class);
+ provinceStatsDS.print(">>>>>");
+
+
+ //TODO 6.将流中的数据写到Clickhouse
+ provinceStatsDS.addSink(
+ ClickhouseUtils.getJdbcSink("insert into province_stats values(?,?,?,?,?,?,?,?,?,?)")
+ );
+
+
+ env.execute();
+
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java
index b693e47..bc6c4ab 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/VisitorStatsApp.java
@@ -3,6 +3,7 @@ package com.atguigu.gmall.realtime.app.dws;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.beans.VisitorStats;
+import com.atguigu.gmall.realtime.utils.ClickhouseUtils;
import com.atguigu.gmall.realtime.utils.DateTimeUtils;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
@@ -37,6 +38,10 @@ import java.util.Date;
*@Author: markilue
*@CreateTime: 2023-05-10 21:04
*@Description: TODO 访客主题统计DWS
+ * 测试流程:
+ * -需要启动的进程:
+ * zk,kafka,hdfs,logger.sh,clickhouse,
+ * BaseLogApp,UniqueVisitorApp,UserJumpDetailApp
*@Version: 1.0
*/
public class VisitorStatsApp {
@@ -286,17 +291,7 @@ public class VisitorStatsApp {
//TODO 10.将聚合统计之后的数据写到clickhouse
DataStreamSink visitorStatsDataStreamSink = reduceDS.addSink(
- JdbcSink.sink(
- "",
- new JdbcStatementBuilder() {
- @Override
- public void accept(PreparedStatement preparedStatement, VisitorStats visitorStats) throws SQLException {
-
- }
- },
- new JdbcExecutionOptions.Builder().build(),
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().build()
- )
+ ClickhouseUtils.getJdbcSink("insert into visitor_stats values(?,?,?,?,?,?,?,?,?,?,?,?)")
);
env.execute();
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql
new file mode 100644
index 0000000..b73ed11
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql
@@ -0,0 +1,39 @@
+create table order_wide
+(
+ province_id BIGINT,
+ province_name STRING,
+ province_area_code STRING,
+ province_iso_code STRING,
+ province_3166_2_code STRING,
+ order_id STRING,
+ split_total_amount DOUBLE,
+ create_time STRING,
+ rowtime as TO_TIMESTAMP(create_time),
+ WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND
+)
+ WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'dwm_order_wide',
+ 'properties.bootstrap.servers' = 'Ding202:9092,Ding203:9092,Ding204:9092',
+ 'properties.group.id' = 'province_stats_app_group',
+ 'scan.startup.mode' = 'latest-offset',
+ 'format' = 'json')
+
+
+select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt,
+ DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt,
+ province_id,
+ province_name,
+ province_area_code area_code,
+ province_iso_code iso_code,
+ province_3166_2_code iso_3166_2,
+ count(distinct order_id) order_count,
+ sum(split_total_amount) order_amount,
+ UNIX_TIMESTAMP() * 1000 ts
+from order_wide
+group by TUMBLE(rowtime, INTERVAL '10' SECOND),
+ province_id,
+ province_name,
+ province_area_code,
+ province_iso_code,
+ province_3166_2_code
\ No newline at end of file
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/GmallConstant.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/GmallConstant.java
new file mode 100644
index 0000000..11331bf
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/GmallConstant.java
@@ -0,0 +1,79 @@
+package com.atguigu.gmall.realtime.beans;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 16:45
+ *@Description: TODO 电商业务常量
+ *@Version: 1.0
+ */
+public class GmallConstant {
+
+ //10 单据状态
+ public static final String ORDER_STATUS_UNPAID = "1001"; //未支付
+ public static final String ORDER_STATUS_PAID = "1002"; //已支付
+ public static final String ORDER_STATUS_CANCEL = "1003";//已取消
+ public static final String ORDER_STATUS_FINISH = "1004";//已完成
+ public static final String ORDER_STATUS_REFUND = "1005";//退款中
+ public static final String ORDER_STATUS_REFUND_DONE = "1006";//退款完成
+
+
+ //11 支付状态
+ public static final String PAYMENT_TYPE_ALIPAY = "1101";//支付宝
+ public static final String PAYMENT_TYPE_WECHAT = "1102";//微信
+ public static final String PAYMENT_TYPE_UNION = "1103";//银联
+
+ //12 评价
+ public static final String APPRAISE_GOOD = "1201";// 好评
+ public static final String APPRAISE_SOSO = "1202";// 中评
+ public static final String APPRAISE_BAD = "1203";// 差评
+ public static final String APPRAISE_AUTO = "1204";// 自动
+
+
+ //13 退货原因
+ public static final String REFUND_REASON_BAD_GOODS = "1301";// 质量问题
+ public static final String REFUND_REASON_WRONG_DESC = "1302";// 商品描述与实际描述不一致
+ public static final String REFUND_REASON_SALE_OUT = "1303";// 缺货
+ public static final String REFUND_REASON_SIZE_ISSUE = "1304";// 号码不合适
+ public static final String REFUND_REASON_MISTAKE = "1305";// 拍错
+ public static final String REFUND_REASON_NO_REASON = "1306";// 不想买了
+ public static final String REFUND_REASON_OTHER = "1307";// 其他
+
+ //14 购物券状态
+ public static final String COUPON_STATUS_UNUSED = "1401";// 未使用
+ public static final String COUPON_STATUS_USING = "1402";// 使用中
+ public static final String COUPON_STATUS_USED = "1403";// 已使用
+
+ //15退款类型
+ public static final String REFUND_TYPE_ONLY_MONEY = "1501";// 仅退款
+ public static final String REFUND_TYPE_WITH_GOODS = "1502";// 退货退款
+
+ //24来源类型
+ public static final String SOURCE_TYPE_QUREY = "2401";// 用户查询
+ public static final String SOURCE_TYPE_PROMOTION = "2402";// 商品推广
+ public static final String SOURCE_TYPE_AUTO_RECOMMEND = "2403";// 智能推荐
+ public static final String SOURCE_TYPE_ACTIVITY = "2404";// 促销活动
+
+
+ //购物券范围
+ public static final String COUPON_RANGE_TYPE_CATEGORY3 = "3301";//
+ public static final String COUPON_RANGE_TYPE_TRADEMARK = "3302";//
+ public static final String COUPON_RANGE_TYPE_SPU = "3303";//
+
+ //购物券类型
+ public static final String COUPON_TYPE_MJ = "3201";//满减
+ public static final String COUPON_TYPE_DZ = "3202";// 满量打折
+ public static final String COUPON_TYPE_DJ = "3203";// 代金券
+
+ public static final String ACTIVITY_RULE_TYPE_MJ = "3101";
+ public static final String ACTIVITY_RULE_TYPE_DZ = "3102";
+ public static final String ACTIVITY_RULE_TYPE_ZK = "3103";
+
+
+ public static final String KEYWORD_SEARCH = "SEARCH";
+ public static final String KEYWORD_CLICK = "CLICK";
+ public static final String KEYWORD_CART = "CART";
+ public static final String KEYWORD_ORDER = "ORDER";
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProductStats.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProductStats.java
new file mode 100644
index 0000000..2e91065
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProductStats.java
@@ -0,0 +1,94 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 15:38
+ *@Description: TODO 商品统计实体类
+ *
+ * @Builder注解
+ * 可以使用构造者方式创建对象,给属性赋值
+ * @Builder.Default
+ * 在使用构造者方式给属性赋值的时候,属性的初始值会丢失
+ * 该注解的作用就是修复这个问题
+ * 例如:我们在属性上赋值了初始值为0L,如果不加这个注解,通过构造者创建的对象属性值会变为null
+ *@Version: 1.0
+ */
+@Data
+@Builder //构造者设计模式
+public class ProductStats {
+
+ String stt;//窗口起始时间
+ String edt; //窗口结束时间
+ Long sku_id; //sku编号
+ String sku_name;//sku名称
+ BigDecimal sku_price; //sku单价
+ Long spu_id; //spu编号
+ String spu_name;//spu名称
+ Long tm_id; //品牌编号
+ String tm_name;//品牌名称
+ Long category3_id;//品类编号
+ String category3_name;//品类名称
+
+ @Builder.Default
+ Long display_ct = 0L; //曝光数
+
+ @Builder.Default
+ Long click_ct = 0L; //点击数
+
+ @Builder.Default
+ Long favor_ct = 0L; //收藏数
+
+ @Builder.Default
+ Long cart_ct = 0L; //添加购物车数
+
+ @Builder.Default
+ Long order_sku_num = 0L; //下单商品个数
+
+ @Builder.Default //下单商品金额
+ BigDecimal order_amount = BigDecimal.ZERO;
+
+ @Builder.Default
+ Long order_ct = 0L; //订单数
+
+ @Builder.Default //支付金额
+ BigDecimal payment_amount = BigDecimal.ZERO;
+
+ @Builder.Default
+ Long paid_order_ct = 0L; //支付订单数
+
+ @Builder.Default
+ Long refund_order_ct = 0L; //退款订单数
+
+ @Builder.Default
+ BigDecimal refund_amount = BigDecimal.ZERO;
+
+ @Builder.Default
+ Long comment_ct = 0L;//评论数
+
+ @Builder.Default
+ Long good_comment_ct = 0L; //好评数
+
+ @Builder.Default
+ @TransientSink
+ Set orderIdSet = new HashSet(); //用于统计订单数
+
+ @Builder.Default
+ @TransientSink
+ Set paidOrderIdSet = new HashSet(); //用于统计支付订单数
+
+ @Builder.Default
+ @TransientSink
+ Set refundOrderIdSet = new HashSet();//用于退款支付订单数
+
+ Long ts; //统计时间戳
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProvinceStats.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProvinceStats.java
new file mode 100644
index 0000000..8868c7e
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/ProvinceStats.java
@@ -0,0 +1,47 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.math.BigDecimal;
+import java.util.Date;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 20:51
+ *@Description: TODO 地区统计宽表实体类:
+ *@Version: 1.0
+ */
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ProvinceStats {
+
+ private String stt;
+ private String edt;
+ private Long province_id;
+ private String province_name;
+ private String area_code;
+ private String iso_code;
+ private String iso_3166_2;
+ private BigDecimal order_amount;
+ private Long order_count;
+ private Long ts;
+
+ public ProvinceStats(OrderWide orderWide){
+ province_id = orderWide.getProvince_id();
+ order_amount = orderWide.getSplit_total_amount();
+ province_name=orderWide.getProvince_name();
+ area_code=orderWide.getProvince_area_code();
+ iso_3166_2=orderWide.getProvince_iso_code();
+ iso_code=orderWide.getProvince_iso_code();
+
+ order_count = 1L;
+ ts=new Date().getTime();
+ }
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/TransientSink.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/TransientSink.java
new file mode 100644
index 0000000..0174a3f
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/TransientSink.java
@@ -0,0 +1,19 @@
+package com.atguigu.gmall.realtime.beans;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 14:39
+ *@Description: TODO 当处理向clickhouse写入数据属性时,如果属性不需要保存到CK,那么可以通过改注解标记
+ *@Version: 1.0
+ */
+@Target(ElementType.FIELD)//注解加的位置
+@Retention(RetentionPolicy.RUNTIME)//注解生效的时间
+public @interface TransientSink {
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java
index 6d250f9..71d8ae4 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/VisitorStats.java
@@ -8,7 +8,7 @@ import lombok.Data;
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-10 21:11
- *@Description: TODO 访客主题实体类 包含各个维度和度量
+ *@Description: TODO 访客主题实体类 包含各个维度和度量(需要类中的属性顺序和clickhouse表中的顺序一致)因为后续需要用反射去获取值
*@Version: 1.0
*/
@Data
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/common/GmallConfig.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/common/GmallConfig.java
index abf7bb7..69aeaa9 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/common/GmallConfig.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/common/GmallConfig.java
@@ -12,4 +12,5 @@ public class GmallConfig {
public static final String HBASE_SCHEMA = "GMALL_REALTIME";//Hbase库名
public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接
+ public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_gmall";//Phoenix连接
}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ClickhouseUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ClickhouseUtils.java
new file mode 100644
index 0000000..36db8c2
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/ClickhouseUtils.java
@@ -0,0 +1,72 @@
+package com.atguigu.gmall.realtime.utils;
+
+import com.atguigu.gmall.realtime.beans.TransientSink;
+import com.atguigu.gmall.realtime.beans.VisitorStats;
+import com.atguigu.gmall.realtime.common.GmallConfig;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcSink;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.lang.reflect.AnnotatedType;
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.utils
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 14:02
+ *@Description: TODO 操作clickhouse工具类
+ *@Version: 1.0
+ */
+public class ClickhouseUtils {
+
+ public static SinkFunction getJdbcSink(String sql) {
+ SinkFunction sinkFunction = JdbcSink.sink(
+ sql,
+ new JdbcStatementBuilder() {
+ @Override
+ public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
+ //obj即为流中的一条数据 获取流中对象obj的属性值,赋值给?占位符
+ //获取流中对象所属类的属性
+ Field[] fields = obj.getClass().getDeclaredFields();
+ int index = 1;
+ for (Field field : fields) {
+ //对象属性数组进行遍历 获取每一个属性
+
+ //判断该属性是否有TransientSink注解修饰
+ TransientSink transientSink = field.getAnnotation(TransientSink.class);
+ if (transientSink != null) {//没有这个注解才需要赋值
+ continue;
+ }
+ //设置私有属性的访问权限
+ field.setAccessible(true);
+ try {
+ //获取对象属性值
+ Object filedValue = field.get(obj);
+ //将属性的值赋值给?占位符
+ preparedStatement.setObject(index++, filedValue);//顺序从1开始
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+ },
+ new JdbcExecutionOptions.Builder()//构造者设计模式-通过内部类对象帮助构造外部对象:https://www.jianshu.com/p/7b52c5e1b6ce
+ .withBatchSize(5)//批量插入 保证效率(每一个slot到5)
+// .withBatchIntervalMs(2000)//超时时间 如果超过2s也插入
+// .withMaxRetries(3)//插入失败之后,重试次数
+ .build(),
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() //构造者设计模式:https://www.jianshu.com/p/7b52c5e1b6ce
+ .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
+ .withUrl(GmallConfig.CLICKHOUSE_URL)
+ .build()
+ );
+ return sinkFunction;
+ }
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java
new file mode 100644
index 0000000..fa2521b
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java
@@ -0,0 +1,49 @@
+package com.atguigu.gmall.realtime.utils;
+
+import org.wltea.analyzer.core.IKSegmenter;
+import org.wltea.analyzer.core.Lexeme;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.utils
+ *@Author: markilue
+ *@CreateTime: 2023-05-11 21:55
+ *@Description: TODO 使用IK分组器进行分词
+ *@Version: 1.0
+ */
+public class KeywordUtils {
+
+
+ //分词方法
+ public static List analyze(String text) {
+ StringReader reader = new StringReader(text);
+ //useSmart:是否使用智能分词策略
+ //非智能分词:细粒度输出所有可能的切分结果
+ //智能分词:合并数词和量词,对分词结果进行歧义判断
+ IKSegmenter ikSegmenter = new IKSegmenter(reader, true);
+ List resultList = new ArrayList<>();
+ try {
+ Lexeme lexeme = null;
+ while ((lexeme = ikSegmenter.next()) != null) {
+ //不等于null则还有词可以分
+ resultList.add(lexeme.getLexemeText());
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return resultList;
+ }
+
+ public static void main(String[] args) {
+ String text = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待";
+ System.out.println(KeywordUtils.analyze(text));
+
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java
index 2084b5a..54671cf 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyKafkaUtils.java
@@ -66,5 +66,17 @@ public class MyKafkaUtils {
return new FlinkKafkaProducer(DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
+ public static String getKafkaDDL(String topic, String groupId) {
+ String connector = " 'connector' = 'kafka'," +
+ " 'topic' = '" + topic + "'," +
+ " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "'," +
+ " 'properties.group.id' = '" + groupId + "'," +
+ " 'scan.startup.mode' = 'latest-offset'," +
+ " 'format' = 'json'";
+
+ return connector;
+
+ }
+
}