实时数仓任务dws更新

This commit is contained in:
markilue 2023-05-11 22:05:30 +08:00
parent 2345b12dde
commit 015e83793d
17 changed files with 1025 additions and 17 deletions

View File

@ -164,6 +164,26 @@
<version>${flink.version}</version> <version>${flink.version}</version>
</dependency> </dependency>
<!--flink sql-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--IK分词器-->
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
<version>2012_u6</version>
</dependency>
</dependencies> </dependencies>

View File

@ -232,7 +232,61 @@
"ts": 1683293507 "ts": 1683293507
} }
{
"page": {
"page_id": "good_detail",
"item": "7",
"during_time": 14474,
"item_type": "sku_id",
"last_page_id": "good_list",
"source_type": "promotion"
},
"displays": [
{
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 2,
"order": 1
},
{
"display_type": "promotion",
"item": "2",
"item_type": "sku_id",
"pos_id": 1,
"order": 2
},
{
"display_type": "query",
"item": "2",
"item_type": "sku_id",
"pos_id": 3,
"order": 3
},
{
"display_type": "promotion",
"item": "2",
"item_type": "sku_id",
"pos_id": 3,
"order": 4
},
{
"display_type": "recommend",
"item": "7",
"item_type": "sku_id",
"pos_id": 4,
"order": 5
},
{
"display_type": "promotion",
"item": "3",
"item_type": "sku_id",
"pos_id": 5,
"order": 6
}
],
"ts": 1683788701000
}

View File

@ -200,8 +200,7 @@ public class OrderWideApp {
public String getKey(OrderWide orderWide) { public String getKey(OrderWide orderWide) {
return orderWide.getUser_id().toString(); return orderWide.getUser_id().toString();
} }
} },
,
60, 60,
TimeUnit.SECONDS TimeUnit.SECONDS
); );

View File

@ -92,7 +92,7 @@ public class PaymentWideApp {
new SerializableTimestampAssigner<OrderWide>() { new SerializableTimestampAssigner<OrderWide>() {
@Override @Override
public long extractTimestamp(OrderWide orderWide, long l) { public long extractTimestamp(OrderWide orderWide, long l) {
return DateTimeUtils.toTs(orderWide.getCreate_date()); return DateTimeUtils.toTs(orderWide.getCreate_time());
} }
} }
) )
@ -119,7 +119,7 @@ public class PaymentWideApp {
//TODO 8.将支付宽表数据写到kafka的dwm_payment_wide //TODO 8.将支付宽表数据写到kafka的dwm_payment_wide
paymentWideDS paymentWideDS
.map(paymentWide -> JSON.toJSONString(paymentDS)) .map(paymentWide -> JSON.toJSONString(paymentWide))
.addSink( .addSink(
MyKafkaUtils.getKafkaSink("dwm_payment_wide") MyKafkaUtils.getKafkaSink("dwm_payment_wide")
); );

View File

@ -0,0 +1,426 @@
package com.atguigu.gmall.realtime.app.dws;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
import com.atguigu.gmall.realtime.beans.GmallConstant;
import com.atguigu.gmall.realtime.beans.OrderWide;
import com.atguigu.gmall.realtime.beans.PaymentWide;
import com.atguigu.gmall.realtime.beans.ProductStats;
import com.atguigu.gmall.realtime.utils.ClickhouseUtils;
import com.atguigu.gmall.realtime.utils.DateTimeUtils;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.dws
*@Author: markilue
*@CreateTime: 2023-05-11 15:47
*@Description: TODO 商品主题统计DWS
*
*@Version: 1.0
*/
public class ProductStatsApp {
public static void main(String[] args) throws Exception {
//TODO 1.流环境建立
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//TODO 2.检查点设置
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//TODO 3.从kafka中读取数据(7个主题)
//3.1 声明消费者主题以及消费者组
String groupId = "product_stats_app";
String pageViewSourceTopic = "dwd_page_log";
String orderWideSourceTopic = "dwm_order_wide";
String paymentWideSourceTopic = "dwm_payment_wide";
String cartInfoSourceTopic = "dwd_cart_info";
String favorInfoSourceTopic = "dwd_favor_info";
String refundInfoSourceTopic = "dwd_order_refund_info";
String commentInfoSourceTopic = "dwd_comment_info";
//创建消费者对象,封装成流
DataStreamSource<String> pageViewSourceStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(pageViewSourceTopic, groupId)
);
DataStreamSource<String> cartInfoSourceStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(cartInfoSourceTopic, groupId)
);
DataStreamSource<String> favorInfoSourceStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(favorInfoSourceTopic, groupId)
);
DataStreamSource<String> refundInfoSourceStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(refundInfoSourceTopic, groupId)
);
DataStreamSource<String> commentInfoSourceStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(commentInfoSourceTopic, groupId)
);
DataStreamSource<String> orderWideSourceStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(orderWideSourceTopic, groupId)
);
DataStreamSource<String> paymentWideSourceStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(paymentWideSourceTopic, groupId)
);
//TODO 4.对流中的数据进行类型转换 jsonStr ->ProductStats
//由于又有点击又有曝光,那么不合适在map里面做,map只能返回一个,这里使用process
//4.1 点击和曝光流转换
SingleOutputStreamOperator<ProductStats> clickAndDisplayStatsDS = pageViewSourceStrDS.process(
/*
{
"page": {
"page_id": "good_detail",
"item": "7",
"during_time": 14474,
"item_type": "sku_id",
"last_page_id": "good_list",
"source_type": "promotion"
},
"displays": [
{
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 2,
"order": 1
},
],
"ts": 1683788701000
}
*/
new ProcessFunction<String, ProductStats>() {
@Override
public void processElement(String jsonStr, ProcessFunction<String, ProductStats>.Context context, Collector<ProductStats> collector) throws Exception {
JSONObject jsonObject = JSON.parseObject(jsonStr);
Long ts = jsonObject.getLong("ts");
//判断是否为点击行为
JSONObject page = jsonObject.getJSONObject("page");
String pageId = page.getString("page_id");
if ("good_detail".equals(pageId)) {
//如果当前日志记录的页面是商品的详情页,那么认为这条日志 记录的是点击行为
Long skuId = page.getLong("item");
ProductStats productStats = ProductStats.builder()
.sku_id(skuId)
.click_ct(1L)
.ts(ts)
.build();
collector.collect(productStats);
}
//判断是否为曝光
JSONArray displays = jsonObject.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
//如果displays数组不为空,说明当前页面上有曝光行为,对所有的曝光进行遍历
for (int i = 0; i < displays.size(); i++) {
JSONObject displayJsonObj = displays.getJSONObject(i);
//曝光的是不是商品
if ("sku_id".equals(displayJsonObj.getString("item_type"))) {
Long itemId = displayJsonObj.getLong("item");
ProductStats productStats = ProductStats.builder()
.sku_id(itemId)
.display_ct(1L)
.ts(ts)
.build();
collector.collect(productStats);
}
}
}
}
}
);
//4.2 转换收藏流数据
SingleOutputStreamOperator<ProductStats> favorInfoStatsDS = favorInfoSourceStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObject = JSON.parseObject(jsonStr);
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObject.getLong("sku_id"))
.favor_ct(1L)
.ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
.build();
return productStats;
}
}
);
//4.3 转换加购流数据
SingleOutputStreamOperator<ProductStats> cartInfoStatsDS = cartInfoSourceStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObject = JSON.parseObject(jsonStr);
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObject.getLong("sku_id"))
.cart_ct(1L)
.ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
.build();
return productStats;
}
}
);
//4.4 转换退款订单流数据(注意是订单数不是商品数,所以需要进行一个去重)
SingleOutputStreamOperator<ProductStats> refundInfoStatsDS = refundInfoSourceStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObject = JSON.parseObject(jsonStr);
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObject.getLong("sku_id"))
.refundOrderIdSet(new HashSet(Collections.singleton(jsonObject.getLong("order_id"))))
.refund_amount(jsonObject.getBigDecimal("refund_amount"))
.ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
.build();
return productStats;
}
}
);
//4.5 转换评价表数据
SingleOutputStreamOperator<ProductStats> commentInfoStatsDS = commentInfoSourceStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObject = JSON.parseObject(jsonStr);
Long goodCt = GmallConstant.APPRAISE_GOOD.equals(jsonObject.getString("appraise")) ? 1L : 0L;
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObject.getLong("sku_id"))
.comment_ct(1L)
.good_comment_ct(goodCt)
.ts(DateTimeUtils.toTs(jsonObject.getString("create_time")))
.build();
return productStats;
}
}
);
//4.6 转换订单宽表流数据
SingleOutputStreamOperator<ProductStats> orderWideStatsDS = orderWideSourceStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
OrderWide orderWide = JSON.parseObject(jsonStr, OrderWide.class);
ProductStats productStats = ProductStats.builder()
.sku_id(orderWide.getSku_id())
.order_sku_num(orderWide.getSku_num())
.order_amount(orderWide.getSplit_total_amount())
.ts(DateTimeUtils.toTs(orderWide.getCreate_time()))
.orderIdSet(new HashSet(Collections.singleton(orderWide.getOrder_id())))
.build();
return productStats;
}
}
);
//4.7 转换支付宽表流数据
SingleOutputStreamOperator<ProductStats> paymentWideStatsDS = paymentWideSourceStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
PaymentWide paymentWide = JSON.parseObject(jsonStr, PaymentWide.class);
ProductStats productStats = ProductStats.builder()
.sku_id(paymentWide.getSku_id())
.payment_amount(paymentWide.getSplit_total_amount())
.paidOrderIdSet(new HashSet(Collections.singleton(paymentWide.getOrder_id())))
.ts(DateTimeUtils.toTs(paymentWide.getCallback_time()))
.build();
return productStats;
}
}
);
//TODO 5.将不同流的数据通过union合并到一起
DataStream<ProductStats> unionDS = clickAndDisplayStatsDS.union(
favorInfoStatsDS,
cartInfoStatsDS,
refundInfoStatsDS,
commentInfoStatsDS,
orderWideStatsDS,
paymentWideStatsDS
);
// unionDS.print(">>>>>>");
//TODO 6.指定watermark以及提取时间事件字段
SingleOutputStreamOperator<ProductStats> productStatsWithWatermarkDS = unionDS.
assignTimestampsAndWatermarks(
WatermarkStrategy.<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<ProductStats>() {
@Override
public long extractTimestamp(ProductStats productStats, long l) {
// System.out.println(productStats);
return productStats.getTs();
}
}
)
);
//TODO 7.分组 注意:目前商品维度数据 处理订单和支付宽表能够获取 其他的流没有维度数据.所以这里使用sku_id进行分组
KeyedStream<ProductStats, Long> keyedDS = productStatsWithWatermarkDS.keyBy(ProductStats::getSku_id);
//TODO 8.开窗
WindowedStream<ProductStats, Long, TimeWindow> windowDS = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
//TODO 9.聚合计算
SingleOutputStreamOperator<ProductStats> reduceDS = windowDS.reduce(
new ReduceFunction<ProductStats>() {
@Override
public ProductStats reduce(ProductStats productStats1, ProductStats productStats2) throws Exception {
productStats1.setDisplay_ct(productStats1.getDisplay_ct() + productStats2.getDisplay_ct());
productStats1.setClick_ct(productStats1.getClick_ct() + productStats2.getClick_ct());
productStats1.setCart_ct(productStats1.getCart_ct() + productStats2.getCart_ct());
productStats1.setFavor_ct(productStats1.getFavor_ct() + productStats2.getFavor_ct());
productStats1.setOrder_amount(productStats1.getOrder_amount().add(productStats2.getOrder_amount()));
productStats1.getOrderIdSet().addAll(productStats2.getOrderIdSet());
productStats1.setOrder_ct(productStats1.getOrderIdSet().size() + 0L);
productStats1.setOrder_sku_num(productStats1.getOrder_sku_num() + productStats2.getOrder_sku_num());
productStats1.setPayment_amount(productStats1.getPayment_amount().add(productStats2.getPayment_amount()));
productStats1.getRefundOrderIdSet().addAll(productStats1.getRefundOrderIdSet());
productStats1.setRefund_order_ct(productStats1.getRefundOrderIdSet().size() + 0L);
productStats1.setRefund_amount(productStats1.getRefund_amount().add(productStats2.getRefund_amount()));
productStats1.getPaidOrderIdSet().addAll(productStats1.getPaidOrderIdSet());
productStats1.setPaid_order_ct(productStats1.getPaidOrderIdSet().size() + 0L);
productStats1.setComment_ct(productStats1.getComment_ct() + productStats2.getComment_ct());
productStats1.setGood_comment_ct(productStats1.getGood_comment_ct() + productStats2.getGood_comment_ct());
return productStats1;
}
},
new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
@Override
public void process(Long aLong, ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>.Context context, Iterable<ProductStats> iterable, Collector<ProductStats> collector) throws Exception {
for (ProductStats productStats : iterable) {
long start = context.window().getStart();
long end = context.window().getEnd();
productStats.setStt(DateTimeUtils.toYMDHMS(new Date(start)));
productStats.setEdt(DateTimeUtils.toYMDHMS(new Date(end)));
productStats.setTs(System.currentTimeMillis());
collector.collect(productStats);
}
}
}
);
//TODO 10.补全商品维度信息
//10.3 补全SKU维度
SingleOutputStreamOperator<ProductStats> productStatsWithSKUDS = AsyncDataStream.unorderedWait(
reduceDS,
new DimAsyncFunction<ProductStats>("DIM_SKU_INFO") {
@Override
public void join(ProductStats input, JSONObject dimJsonObj) throws Exception {
input.setSku_name(dimJsonObj.getString("SKU_NAME"));
input.setSku_price(dimJsonObj.getBigDecimal("PRICE"));
input.setSpu_id(dimJsonObj.getLong("SPU_ID"));
input.setTm_id(dimJsonObj.getLong("TM_ID"));
input.setCategory3_id(dimJsonObj.getLong("CATEGORY3_ID"));
}
@Override
public String getKey(ProductStats input) {
return input.getSku_id().toString();
}
},
60, TimeUnit.SECONDS
);
//10.2 补充SPU维度
SingleOutputStreamOperator<ProductStats> productStatsWithSpuDS =
AsyncDataStream.unorderedWait(productStatsWithSKUDS,
new DimAsyncFunction<ProductStats>("DIM_SPU_INFO") {
@Override
public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
productStats.setSpu_name(jsonObject.getString("SPU_NAME"));
}
@Override
public String getKey(ProductStats productStats) {
return String.valueOf(productStats.getSpu_id());
}
}, 60, TimeUnit.SECONDS);
//10.3 补充品类维度
SingleOutputStreamOperator<ProductStats> productStatsWithCategory3DS =
AsyncDataStream.unorderedWait(productStatsWithSpuDS,
new DimAsyncFunction<ProductStats>("DIM_BASE_CATEGORY3") {
@Override
public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
productStats.setCategory3_name(jsonObject.getString("NAME"));
}
@Override
public String getKey(ProductStats productStats) {
return String.valueOf(productStats.getCategory3_id());
}
}, 60, TimeUnit.SECONDS);
//10.4 补充品牌维度
SingleOutputStreamOperator<ProductStats> productStatsWithTmDS =
AsyncDataStream.unorderedWait(productStatsWithCategory3DS,
new DimAsyncFunction<ProductStats>("DIM_BASE_TRADEMARK") {
@Override
public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
productStats.setTm_name(jsonObject.getString("TM_NAME"));
}
@Override
public String getKey(ProductStats productStats) {
return String.valueOf(productStats.getTm_id());
}
}, 60, TimeUnit.SECONDS);
productStatsWithTmDS.print(">>>>>");
//TODO 11.将计算结果写到clickhouse
productStatsWithTmDS.addSink(
ClickhouseUtils.getJdbcSink("insert into product_stats values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
);
env.execute();
}
}

View File

@ -0,0 +1,102 @@
package com.atguigu.gmall.realtime.app.dws;
import com.atguigu.gmall.realtime.beans.ProvinceStats;
import com.atguigu.gmall.realtime.utils.ClickhouseUtils;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.dws
*@Author: markilue
*@CreateTime: 2023-05-11 19:51
*@Description: TODO 地区主题统计 --SQL
*@Version: 1.0
*/
public class ProvinceStatsApp {
public static void main(String[] args) throws Exception {
//TODO 1.环境准备
//1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();//设置流处理环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//1.3 设置并行度
env.setParallelism(4);
//TODO 2.检查点相关设置
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(5000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//TODO 3.从指定数据源(kafka)读取数据 转换为动态表
//从orderWide读取数据
String topic = "dwm_order_wide";
String groupId = "province_stats_app_group";
String createSQL = "CREATE TABLE order_wide (" +
" province_id BIGINT," +
" province_name STRING," +
" province_area_code STRING," +
" province_iso_code STRING," +
" province_3166_2_code STRING," +
" order_id STRING," +
" split_total_amount DOUBLE," +
" create_time STRING," +
" rowtime as TO_TIMESTAMP(create_time) ," +
" WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND " +
") WITH (" + MyKafkaUtils.getKafkaDDL(topic, groupId) + ")";
// System.out.println(createSQL);
tableEnv.executeSql(createSQL);
//TODO 4.进行分组 开窗 计算
String selectSQL = "select " +
" DATE_FORMAT(TUMBLE_START(rowtime,INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as stt," +
" DATE_FORMAT(TUMBLE_END(rowtime,INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as edt," +
" province_id," +
" province_name," +
" province_area_code area_code," +
" province_iso_code iso_code," +
" province_3166_2_code iso_3166_2," +
" count(distinct order_id) order_count," +
" sum(split_total_amount) order_amount," +
" UNIX_TIMESTAMP() * 1000 as ts" +
" from " +
" order_wide" +
" group by " +
" TUMBLE(rowtime,INTERVAL '10' SECOND)," +
" province_id," +
" province_name," +
" province_area_code," +
" province_iso_code," +
" province_3166_2_code";
// System.out.println(selectSQL);
Table orderWideTable = tableEnv.sqlQuery(selectSQL);
//TODO 5.将动态表转化为流
DataStream<ProvinceStats> provinceStatsDS = tableEnv.toAppendStream(orderWideTable, ProvinceStats.class);
provinceStatsDS.print(">>>>>");
//TODO 6.将流中的数据写到Clickhouse
provinceStatsDS.addSink(
ClickhouseUtils.getJdbcSink("insert into province_stats values(?,?,?,?,?,?,?,?,?,?)")
);
env.execute();
}
}

View File

@ -3,6 +3,7 @@ package com.atguigu.gmall.realtime.app.dws;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.beans.VisitorStats; import com.atguigu.gmall.realtime.beans.VisitorStats;
import com.atguigu.gmall.realtime.utils.ClickhouseUtils;
import com.atguigu.gmall.realtime.utils.DateTimeUtils; import com.atguigu.gmall.realtime.utils.DateTimeUtils;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils; import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
@ -37,6 +38,10 @@ import java.util.Date;
*@Author: markilue *@Author: markilue
*@CreateTime: 2023-05-10 21:04 *@CreateTime: 2023-05-10 21:04
*@Description: TODO 访客主题统计DWS *@Description: TODO 访客主题统计DWS
* 测试流程:
* -需要启动的进程:
* zk,kafka,hdfs,logger.sh,clickhouse,
* BaseLogApp,UniqueVisitorApp,UserJumpDetailApp
*@Version: 1.0 *@Version: 1.0
*/ */
public class VisitorStatsApp { public class VisitorStatsApp {
@ -286,17 +291,7 @@ public class VisitorStatsApp {
//TODO 10.将聚合统计之后的数据写到clickhouse //TODO 10.将聚合统计之后的数据写到clickhouse
DataStreamSink<VisitorStats> visitorStatsDataStreamSink = reduceDS.addSink( DataStreamSink<VisitorStats> visitorStatsDataStreamSink = reduceDS.addSink(
JdbcSink.sink( ClickhouseUtils.getJdbcSink("insert into visitor_stats values(?,?,?,?,?,?,?,?,?,?,?,?)")
"",
new JdbcStatementBuilder<VisitorStats>() {
@Override
public void accept(PreparedStatement preparedStatement, VisitorStats visitorStats) throws SQLException {
}
},
new JdbcExecutionOptions.Builder().build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().build()
)
); );
env.execute(); env.execute();

View File

@ -0,0 +1,39 @@
create table order_wide
(
province_id BIGINT,
province_name STRING,
province_area_code STRING,
province_iso_code STRING,
province_3166_2_code STRING,
order_id STRING,
split_total_amount DOUBLE,
create_time STRING,
rowtime as TO_TIMESTAMP(create_time),
WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'dwm_order_wide',
'properties.bootstrap.servers' = 'Ding202:9092,Ding203:9092,Ding204:9092',
'properties.group.id' = 'province_stats_app_group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json')
select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt,
DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt,
province_id,
province_name,
province_area_code area_code,
province_iso_code iso_code,
province_3166_2_code iso_3166_2,
count(distinct order_id) order_count,
sum(split_total_amount) order_amount,
UNIX_TIMESTAMP() * 1000 ts
from order_wide
group by TUMBLE(rowtime, INTERVAL '10' SECOND),
province_id,
province_name,
province_area_code,
province_iso_code,
province_3166_2_code

View File

@ -0,0 +1,79 @@
package com.atguigu.gmall.realtime.beans;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-11 16:45
*@Description: TODO 电商业务常量
*@Version: 1.0
*/
public class GmallConstant {
//10 单据状态
public static final String ORDER_STATUS_UNPAID = "1001"; //未支付
public static final String ORDER_STATUS_PAID = "1002"; //已支付
public static final String ORDER_STATUS_CANCEL = "1003";//已取消
public static final String ORDER_STATUS_FINISH = "1004";//已完成
public static final String ORDER_STATUS_REFUND = "1005";//退款中
public static final String ORDER_STATUS_REFUND_DONE = "1006";//退款完成
//11 支付状态
public static final String PAYMENT_TYPE_ALIPAY = "1101";//支付宝
public static final String PAYMENT_TYPE_WECHAT = "1102";//微信
public static final String PAYMENT_TYPE_UNION = "1103";//银联
//12 评价
public static final String APPRAISE_GOOD = "1201";// 好评
public static final String APPRAISE_SOSO = "1202";// 中评
public static final String APPRAISE_BAD = "1203";// 差评
public static final String APPRAISE_AUTO = "1204";// 自动
//13 退货原因
public static final String REFUND_REASON_BAD_GOODS = "1301";// 质量问题
public static final String REFUND_REASON_WRONG_DESC = "1302";// 商品描述与实际描述不一致
public static final String REFUND_REASON_SALE_OUT = "1303";// 缺货
public static final String REFUND_REASON_SIZE_ISSUE = "1304";// 号码不合适
public static final String REFUND_REASON_MISTAKE = "1305";// 拍错
public static final String REFUND_REASON_NO_REASON = "1306";// 不想买了
public static final String REFUND_REASON_OTHER = "1307";// 其他
//14 购物券状态
public static final String COUPON_STATUS_UNUSED = "1401";// 未使用
public static final String COUPON_STATUS_USING = "1402";// 使用中
public static final String COUPON_STATUS_USED = "1403";// 已使用
//15退款类型
public static final String REFUND_TYPE_ONLY_MONEY = "1501";// 仅退款
public static final String REFUND_TYPE_WITH_GOODS = "1502";// 退货退款
//24来源类型
public static final String SOURCE_TYPE_QUREY = "2401";// 用户查询
public static final String SOURCE_TYPE_PROMOTION = "2402";// 商品推广
public static final String SOURCE_TYPE_AUTO_RECOMMEND = "2403";// 智能推荐
public static final String SOURCE_TYPE_ACTIVITY = "2404";// 促销活动
//购物券范围
public static final String COUPON_RANGE_TYPE_CATEGORY3 = "3301";//
public static final String COUPON_RANGE_TYPE_TRADEMARK = "3302";//
public static final String COUPON_RANGE_TYPE_SPU = "3303";//
//购物券类型
public static final String COUPON_TYPE_MJ = "3201";//满减
public static final String COUPON_TYPE_DZ = "3202";// 满量打折
public static final String COUPON_TYPE_DJ = "3203";// 代金券
public static final String ACTIVITY_RULE_TYPE_MJ = "3101";
public static final String ACTIVITY_RULE_TYPE_DZ = "3102";
public static final String ACTIVITY_RULE_TYPE_ZK = "3103";
public static final String KEYWORD_SEARCH = "SEARCH";
public static final String KEYWORD_CLICK = "CLICK";
public static final String KEYWORD_CART = "CART";
public static final String KEYWORD_ORDER = "ORDER";
}

View File

@ -0,0 +1,94 @@
package com.atguigu.gmall.realtime.beans;
import lombok.Builder;
import lombok.Data;
import java.math.BigDecimal;
import java.util.HashSet;
import java.util.Set;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-11 15:38
*@Description: TODO 商品统计实体类
*
* @Builder注解
* 可以使用构造者方式创建对象给属性赋值
* @Builder.Default
* 在使用构造者方式给属性赋值的时候属性的初始值会丢失
* 该注解的作用就是修复这个问题
* 例如我们在属性上赋值了初始值为0L如果不加这个注解通过构造者创建的对象属性值会变为null
*@Version: 1.0
*/
@Data
@Builder //构造者设计模式
public class ProductStats {
String stt;//窗口起始时间
String edt; //窗口结束时间
Long sku_id; //sku编号
String sku_name;//sku名称
BigDecimal sku_price; //sku单价
Long spu_id; //spu编号
String spu_name;//spu名称
Long tm_id; //品牌编号
String tm_name;//品牌名称
Long category3_id;//品类编号
String category3_name;//品类名称
@Builder.Default
Long display_ct = 0L; //曝光数
@Builder.Default
Long click_ct = 0L; //点击数
@Builder.Default
Long favor_ct = 0L; //收藏数
@Builder.Default
Long cart_ct = 0L; //添加购物车数
@Builder.Default
Long order_sku_num = 0L; //下单商品个数
@Builder.Default //下单商品金额
BigDecimal order_amount = BigDecimal.ZERO;
@Builder.Default
Long order_ct = 0L; //订单数
@Builder.Default //支付金额
BigDecimal payment_amount = BigDecimal.ZERO;
@Builder.Default
Long paid_order_ct = 0L; //支付订单数
@Builder.Default
Long refund_order_ct = 0L; //退款订单数
@Builder.Default
BigDecimal refund_amount = BigDecimal.ZERO;
@Builder.Default
Long comment_ct = 0L;//评论数
@Builder.Default
Long good_comment_ct = 0L; //好评数
@Builder.Default
@TransientSink
Set orderIdSet = new HashSet(); //用于统计订单数
@Builder.Default
@TransientSink
Set paidOrderIdSet = new HashSet(); //用于统计支付订单数
@Builder.Default
@TransientSink
Set refundOrderIdSet = new HashSet();//用于退款支付订单数
Long ts; //统计时间戳
}

View File

@ -0,0 +1,47 @@
package com.atguigu.gmall.realtime.beans;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.util.Date;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-11 20:51
*@Description: TODO 地区统计宽表实体类:
*@Version: 1.0
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProvinceStats {
private String stt;
private String edt;
private Long province_id;
private String province_name;
private String area_code;
private String iso_code;
private String iso_3166_2;
private BigDecimal order_amount;
private Long order_count;
private Long ts;
public ProvinceStats(OrderWide orderWide){
province_id = orderWide.getProvince_id();
order_amount = orderWide.getSplit_total_amount();
province_name=orderWide.getProvince_name();
area_code=orderWide.getProvince_area_code();
iso_3166_2=orderWide.getProvince_iso_code();
iso_code=orderWide.getProvince_iso_code();
order_count = 1L;
ts=new Date().getTime();
}
}

View File

@ -0,0 +1,19 @@
package com.atguigu.gmall.realtime.beans;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-11 14:39
*@Description: TODO 当处理向clickhouse写入数据属性时,如果属性不需要保存到CK那么可以通过改注解标记
*@Version: 1.0
*/
@Target(ElementType.FIELD)//注解加的位置
@Retention(RetentionPolicy.RUNTIME)//注解生效的时间
public @interface TransientSink {
}

View File

@ -8,7 +8,7 @@ import lombok.Data;
*@BelongsPackage: com.atguigu.gmall.realtime.beans *@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue *@Author: markilue
*@CreateTime: 2023-05-10 21:11 *@CreateTime: 2023-05-10 21:11
*@Description: TODO 访客主题实体类 包含各个维度和度量 *@Description: TODO 访客主题实体类 包含各个维度和度量(需要类中的属性顺序和clickhouse表中的顺序一致)因为后续需要用反射去获取值
*@Version: 1.0 *@Version: 1.0
*/ */
@Data @Data

View File

@ -12,4 +12,5 @@ public class GmallConfig {
public static final String HBASE_SCHEMA = "GMALL_REALTIME";//Hbase库名 public static final String HBASE_SCHEMA = "GMALL_REALTIME";//Hbase库名
public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接 public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_gmall";//Phoenix连接
} }

View File

@ -0,0 +1,72 @@
package com.atguigu.gmall.realtime.utils;
import com.atguigu.gmall.realtime.beans.TransientSink;
import com.atguigu.gmall.realtime.beans.VisitorStats;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.lang.reflect.AnnotatedType;
import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-11 14:02
*@Description: TODO 操作clickhouse工具类
*@Version: 1.0
*/
public class ClickhouseUtils {
public static <T> SinkFunction<T> getJdbcSink(String sql) {
SinkFunction<T> sinkFunction = JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
@Override
public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
//obj即为流中的一条数据 获取流中对象obj的属性值,赋值给?占位符
//获取流中对象所属类的属性
Field[] fields = obj.getClass().getDeclaredFields();
int index = 1;
for (Field field : fields) {
//对象属性数组进行遍历 获取每一个属性
//判断该属性是否有TransientSink注解修饰
TransientSink transientSink = field.getAnnotation(TransientSink.class);
if (transientSink != null) {//没有这个注解才需要赋值
continue;
}
//设置私有属性的访问权限
field.setAccessible(true);
try {
//获取对象属性值
Object filedValue = field.get(obj);
//将属性的值赋值给?占位符
preparedStatement.setObject(index++, filedValue);//顺序从1开始
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
},
new JdbcExecutionOptions.Builder()//构造者设计模式-通过内部类对象帮助构造外部对象:https://www.jianshu.com/p/7b52c5e1b6ce
.withBatchSize(5)//批量插入 保证效率(每一个slot到5)
// .withBatchIntervalMs(2000)//超时时间 如果超过2s也插入
// .withMaxRetries(3)//插入失败之后,重试次数
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() //构造者设计模式:https://www.jianshu.com/p/7b52c5e1b6ce
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl(GmallConfig.CLICKHOUSE_URL)
.build()
);
return sinkFunction;
}
}

View File

@ -0,0 +1,49 @@
package com.atguigu.gmall.realtime.utils;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-11 21:55
*@Description: TODO 使用IK分组器进行分词
*@Version: 1.0
*/
public class KeywordUtils {
//分词方法
public static List<String> analyze(String text) {
StringReader reader = new StringReader(text);
//useSmart:是否使用智能分词策略
//非智能分词:细粒度输出所有可能的切分结果
//智能分词:合并数词和量词对分词结果进行歧义判断
IKSegmenter ikSegmenter = new IKSegmenter(reader, true);
List<String> resultList = new ArrayList<>();
try {
Lexeme lexeme = null;
while ((lexeme = ikSegmenter.next()) != null) {
//不等于null则还有词可以分
resultList.add(lexeme.getLexemeText());
}
} catch (IOException e) {
e.printStackTrace();
}
return resultList;
}
public static void main(String[] args) {
String text = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待";
System.out.println(KeywordUtils.analyze(text));
}
}

View File

@ -66,5 +66,17 @@ public class MyKafkaUtils {
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
} }
public static String getKafkaDDL(String topic, String groupId) {
String connector = " 'connector' = 'kafka'," +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "'," +
" 'properties.group.id' = '" + groupId + "'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'";
return connector;
}
} }