实时数仓任务dwm更新
This commit is contained in:
parent
fea6d3debc
commit
e2d2f8925f
|
|
@ -141,6 +141,29 @@
|
||||||
<version>3.3.0</version>
|
<version>3.3.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!--clickhouse驱动-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>ru.yandex.clickhouse</groupId>
|
||||||
|
<artifactId>clickhouse-jdbc</artifactId>
|
||||||
|
<version>0.3.0</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-core</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,46 @@
|
||||||
|
package com.atguigu.gmall.realtime.app.dwd;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||||
|
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||||
|
import org.apache.flink.streaming.api.CheckpointingMode;
|
||||||
|
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.app.dwd
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 20:06
|
||||||
|
*@Description: TODO 抽取基类:模板方法设计模式
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
public abstract class Base {
|
||||||
|
|
||||||
|
public void entry() throws Exception {
|
||||||
|
//TODO 1.基本环境准确
|
||||||
|
//1.1 流处理环境
|
||||||
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
//1.2 设置并行度
|
||||||
|
env.setParallelism(4);
|
||||||
|
|
||||||
|
//TODO 2.检查点设置
|
||||||
|
//2.1 开启检查点
|
||||||
|
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
|
||||||
|
//2.2 设置检查点超时时间
|
||||||
|
env.getCheckpointConfig().setCheckpointTimeout(60000L);
|
||||||
|
//2.3 设置重启策略
|
||||||
|
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
|
||||||
|
//2.4 设置job取消后,检查点是否保存
|
||||||
|
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
||||||
|
//2.5 设置状态后端 内存/文件系统/RocksDB
|
||||||
|
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
|
||||||
|
//2.6 指定操作HDFS用户
|
||||||
|
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
|
||||||
|
|
||||||
|
execute(env);
|
||||||
|
|
||||||
|
env.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void execute(StreamExecutionEnvironment environment);
|
||||||
|
}
|
||||||
|
|
@ -1,10 +1,13 @@
|
||||||
package com.atguigu.gmall.realtime.app.dwm;
|
package com.atguigu.gmall.realtime.app.dwm;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
|
||||||
import com.atguigu.gmall.realtime.beans.OrderDetail;
|
import com.atguigu.gmall.realtime.beans.OrderDetail;
|
||||||
import com.atguigu.gmall.realtime.beans.OrderInfo;
|
import com.atguigu.gmall.realtime.beans.OrderInfo;
|
||||||
import com.atguigu.gmall.realtime.beans.OrderWide;
|
import com.atguigu.gmall.realtime.beans.OrderWide;
|
||||||
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
|
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
|
||||||
|
import org.apache.avro.Schema;
|
||||||
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
||||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||||
import org.apache.flink.api.common.functions.MapFunction;
|
import org.apache.flink.api.common.functions.MapFunction;
|
||||||
|
|
@ -13,17 +16,23 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||||
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||||
|
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||||
import org.apache.flink.streaming.api.datastream.KeyedStream;
|
import org.apache.flink.streaming.api.datastream.KeyedStream;
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
|
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
|
||||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*@BelongsProject: rt-gmall-parent
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
|
@ -31,6 +40,24 @@ import java.time.Duration;
|
||||||
*@Author: markilue
|
*@Author: markilue
|
||||||
*@CreateTime: 2023-05-09 14:33
|
*@CreateTime: 2023-05-09 14:33
|
||||||
*@Description: TODO 订单宽表准备
|
*@Description: TODO 订单宽表准备
|
||||||
|
* -- 需要启动的进程:
|
||||||
|
* zk,hdfs,hbase,kafka,maxwell,,redis
|
||||||
|
* BaseDBApp,OrderWideApp
|
||||||
|
* --执行流程:
|
||||||
|
* >运行模拟生成业务数据的jar
|
||||||
|
* >会想业务数据库Mysql中插入生成业务数据
|
||||||
|
* >mysql会将变化的数据放到binlog中
|
||||||
|
* >maxwell从binlog中获取数据,将位数封装为json字符串发送到kafka的ods主题中 ods_base_db_m
|
||||||
|
* >BaseLogApp从ods_base_db_m主题中读取数据,进行分流
|
||||||
|
* &事实数据----写回到kafka的dwd主题
|
||||||
|
* &业务数据----保存到phoenix的维度表中
|
||||||
|
* >OrderWideApp从dwd主题中获取订单和订单明细数据
|
||||||
|
* >使用intervalJoin对订单和订单明细进行双流join
|
||||||
|
* >将用户维度关联到订单宽表上
|
||||||
|
* *基本的维度关联
|
||||||
|
* *优化1:旁路缓存
|
||||||
|
* *优化2:异步IO
|
||||||
|
*
|
||||||
*@Version: 1.0
|
*@Version: 1.0
|
||||||
*/
|
*/
|
||||||
public class OrderWideApp {
|
public class OrderWideApp {
|
||||||
|
|
@ -146,12 +173,139 @@ public class OrderWideApp {
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
orderWideDS.print(">>>");
|
// orderWideDS.print(">>>");
|
||||||
|
|
||||||
|
//TODO 8.和用户维度进行关联 ->异步操作进行关联
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(
|
||||||
|
orderWideDS,
|
||||||
|
new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
|
||||||
|
@Override
|
||||||
|
public void join(OrderWide orderWide, JSONObject dimJsonObj) throws ParseException {
|
||||||
|
String gender = dimJsonObj.getString("GENDER");
|
||||||
|
//计算年龄
|
||||||
|
String birthday = dimJsonObj.getString("BIRTHDAY");//1972-06-15
|
||||||
|
|
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||||
|
Date birthdayDate = sdf.parse(birthday);
|
||||||
|
long betweenMs = System.currentTimeMillis() - birthdayDate.getTime();
|
||||||
|
long ageLong = betweenMs / 1000L / 60L / 60L / 24L / 365L;
|
||||||
|
int age = (int) ageLong;
|
||||||
|
|
||||||
|
//维度关联
|
||||||
|
orderWide.setUser_gender(gender);
|
||||||
|
orderWide.setUser_age(age);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKey(OrderWide orderWide) {
|
||||||
|
return orderWide.getUser_id().toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
,
|
||||||
|
60,
|
||||||
|
TimeUnit.SECONDS
|
||||||
|
);
|
||||||
|
|
||||||
|
//TODO 9.和地区纬度进行关联
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(
|
||||||
|
orderWideWithUserDS,
|
||||||
|
new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
|
||||||
|
@Override
|
||||||
|
public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception {
|
||||||
|
orderWide.setProvince_3166_2_code(dimJsonObj.getString("NAME"));
|
||||||
|
orderWide.setProvince_area_code(dimJsonObj.getString("ISO_3166_2"));
|
||||||
|
orderWide.setProvince_name(dimJsonObj.getString("AREA_CODE"));
|
||||||
|
orderWide.setProvince_iso_code(dimJsonObj.getString("ISO_CODE"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKey(OrderWide input) {
|
||||||
|
return input.getProvince_id().toString();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
60, TimeUnit.SECONDS
|
||||||
|
);
|
||||||
|
|
||||||
|
//TODO 10.和SKU维度进行关联
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
|
||||||
|
orderWideWithProvinceDS,
|
||||||
|
new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
|
||||||
|
@Override
|
||||||
|
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
|
||||||
|
orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
|
||||||
|
orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
|
||||||
|
orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
|
||||||
|
orderWide.setTm_id(jsonObject.getLong("TM_ID"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKey(OrderWide orderWide) {
|
||||||
|
return String.valueOf(orderWide.getSku_id());
|
||||||
|
}
|
||||||
|
}, 60, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
|
||||||
|
//TODO 11.和SPU维度进行关联
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
|
||||||
|
orderWideWithSkuDS,
|
||||||
|
new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
|
||||||
|
@Override
|
||||||
|
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
|
||||||
|
orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKey(OrderWide orderWide) {
|
||||||
|
return String.valueOf(orderWide.getSpu_id());
|
||||||
|
}
|
||||||
|
}, 60, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
|
||||||
|
//TODO 12.和类别度进行关联
|
||||||
|
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
|
||||||
|
orderWideWithSpuDS,
|
||||||
|
new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
|
||||||
|
@Override
|
||||||
|
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
|
||||||
|
orderWide.setCategory3_name(jsonObject.getString("NAME"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKey(OrderWide orderWide) {
|
||||||
|
return String.valueOf(orderWide.getCategory3_id());
|
||||||
|
}
|
||||||
|
}, 60, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
|
||||||
|
//TODO 13.和品牌维度进行关联
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
|
||||||
|
orderWideWithCategory3DS,
|
||||||
|
new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
|
||||||
|
@Override
|
||||||
|
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
|
||||||
|
orderWide.setTm_name(jsonObject.getString("TM_NAME"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKey(OrderWide orderWide) {
|
||||||
|
return String.valueOf(orderWide.getTm_id());
|
||||||
|
}
|
||||||
|
}, 60, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
|
||||||
|
orderWideWithTmDS.print(">>>>>");
|
||||||
|
// orderWideWithTmDS.print(">>>>>");
|
||||||
|
|
||||||
|
//TODO 14.将订单宽表数据写回kafka的dwm_order_wide
|
||||||
|
//JSON.parseObject(jsonStr): 将字符串转换为json对象
|
||||||
|
//JSON.parseObject(jsonStr,类型): 将字符串转换为对应类型的对象
|
||||||
|
//JSON.toJSONString(对象): 将对象转换为json格式字符串
|
||||||
|
orderWideWithTmDS.map(JSON::toJSONString).addSink(
|
||||||
|
MyKafkaUtils.getKafkaSink("dwm_order_wide")
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
env.execute();
|
env.execute();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,130 @@
|
||||||
|
package com.atguigu.gmall.realtime.app.dwm;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.atguigu.gmall.realtime.beans.OrderWide;
|
||||||
|
import com.atguigu.gmall.realtime.beans.PaymentInfo;
|
||||||
|
import com.atguigu.gmall.realtime.beans.PaymentWide;
|
||||||
|
import com.atguigu.gmall.realtime.utils.DateTimeUtils;
|
||||||
|
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
|
||||||
|
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
||||||
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||||
|
import org.apache.flink.api.common.functions.MapFunction;
|
||||||
|
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||||
|
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||||
|
import org.apache.flink.streaming.api.CheckpointingMode;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||||
|
import org.apache.flink.streaming.api.datastream.KeyedStream;
|
||||||
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
|
||||||
|
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 19:03
|
||||||
|
*@Description: TODO 支付宽表准备
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
public class PaymentWideApp {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
//TODO 1.创建流环境
|
||||||
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
env.setParallelism(4);
|
||||||
|
|
||||||
|
//TODO 2.检查点配置
|
||||||
|
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
|
||||||
|
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
||||||
|
env.getCheckpointConfig().setCheckpointTimeout(60000L);
|
||||||
|
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
|
||||||
|
env.setStateBackend(new FsStateBackend("hdfs://Ding202/rt_gmall/gmall"));
|
||||||
|
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
|
||||||
|
|
||||||
|
//TODO 3.从kafka中读取数据(订单宽表和dwd_payment_info)
|
||||||
|
String paymentTopic = "dwd_payment_info";
|
||||||
|
String orderWideTopic = "dwm_order_wide";
|
||||||
|
String groupId = "payment_wide_app_group";
|
||||||
|
DataStreamSource<String> paymentStrDS = env.addSource(
|
||||||
|
MyKafkaUtils.getKafkaSource(paymentTopic, groupId)
|
||||||
|
);
|
||||||
|
DataStreamSource<String> orderWideStrDS = env.addSource(
|
||||||
|
MyKafkaUtils.getKafkaSource(orderWideTopic, groupId)
|
||||||
|
);
|
||||||
|
|
||||||
|
//TODO 4.数据格式进行转换转为paymentInfo,orderWide对象
|
||||||
|
SingleOutputStreamOperator<PaymentInfo> paymentDS = paymentStrDS.map(
|
||||||
|
new MapFunction<String, PaymentInfo>() {
|
||||||
|
@Override
|
||||||
|
public PaymentInfo map(String s) throws Exception {
|
||||||
|
return JSON.parseObject(s, PaymentInfo.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideDS = orderWideStrDS.map(
|
||||||
|
orderWideStr -> JSON.parseObject(orderWideStr, OrderWide.class)
|
||||||
|
);
|
||||||
|
|
||||||
|
// paymentDS.print(">>>>");
|
||||||
|
// orderWideDS.print("$$$$");
|
||||||
|
|
||||||
|
//TODO 5.注册水位线
|
||||||
|
SingleOutputStreamOperator<PaymentInfo> paymentInfoWithWatermarkDS = paymentDS.assignTimestampsAndWatermarks(
|
||||||
|
WatermarkStrategy.<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
|
||||||
|
.withTimestampAssigner(
|
||||||
|
new SerializableTimestampAssigner<PaymentInfo>() {
|
||||||
|
@Override
|
||||||
|
public long extractTimestamp(PaymentInfo paymentInfo, long l) {
|
||||||
|
return DateTimeUtils.toTs(paymentInfo.getCallback_time());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
SingleOutputStreamOperator<OrderWide> orderWideWithWatermarkDS = orderWideDS.assignTimestampsAndWatermarks(
|
||||||
|
WatermarkStrategy.<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3))
|
||||||
|
.withTimestampAssigner(
|
||||||
|
new SerializableTimestampAssigner<OrderWide>() {
|
||||||
|
@Override
|
||||||
|
public long extractTimestamp(OrderWide orderWide, long l) {
|
||||||
|
return DateTimeUtils.toTs(orderWide.getCreate_date());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
//TODO 6.分组,创建连接字段
|
||||||
|
KeyedStream<PaymentInfo, Long> paymentInfoKeyedDS = paymentInfoWithWatermarkDS.keyBy(PaymentInfo::getOrder_id);
|
||||||
|
KeyedStream<OrderWide, Long> orderWideKeyedDS = orderWideWithWatermarkDS.keyBy(OrderWide::getOrder_id);
|
||||||
|
|
||||||
|
//TODO 7.双流join
|
||||||
|
SingleOutputStreamOperator<PaymentWide> paymentWideDS = paymentInfoKeyedDS
|
||||||
|
.intervalJoin(orderWideKeyedDS)
|
||||||
|
.between(Time.seconds(-1800), Time.seconds(0))//往前半小时
|
||||||
|
.process(
|
||||||
|
new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
|
||||||
|
@Override
|
||||||
|
public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>.Context context, Collector<PaymentWide> collector) throws Exception {
|
||||||
|
collector.collect(new PaymentWide(paymentInfo, orderWide));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
paymentWideDS.print(">>>>");
|
||||||
|
|
||||||
|
//TODO 8.将支付宽表数据写到kafka的dwm_payment_wide
|
||||||
|
paymentWideDS
|
||||||
|
.map(paymentWide -> JSON.toJSONString(paymentDS))
|
||||||
|
.addSink(
|
||||||
|
MyKafkaUtils.getKafkaSink("dwm_payment_wide")
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
env.execute();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,304 @@
|
||||||
|
package com.atguigu.gmall.realtime.app.dws;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.atguigu.gmall.realtime.beans.VisitorStats;
|
||||||
|
import com.atguigu.gmall.realtime.utils.DateTimeUtils;
|
||||||
|
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
|
||||||
|
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
||||||
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||||
|
import org.apache.flink.api.common.functions.MapFunction;
|
||||||
|
import org.apache.flink.api.common.functions.ReduceFunction;
|
||||||
|
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple4;
|
||||||
|
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
|
||||||
|
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
|
||||||
|
import org.apache.flink.connector.jdbc.JdbcSink;
|
||||||
|
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
|
||||||
|
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||||
|
import org.apache.flink.streaming.api.datastream.*;
|
||||||
|
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||||
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||||
|
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||||
|
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.app.dws
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 21:04
|
||||||
|
*@Description: TODO 访客主题统计DWS
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
public class VisitorStatsApp {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
//TODO 1.流环境建立
|
||||||
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
env.setParallelism(4);
|
||||||
|
|
||||||
|
//TODO 2.检查点设置
|
||||||
|
env.enableCheckpointing(5000L);
|
||||||
|
env.getCheckpointConfig().setCheckpointTimeout(3000L);
|
||||||
|
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
||||||
|
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
|
||||||
|
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
|
||||||
|
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
|
||||||
|
|
||||||
|
|
||||||
|
//TODO 3.从kafka中读取数据
|
||||||
|
String groupId = "visitor_stats_app_group";
|
||||||
|
String pageLogTopic = "dwd_page_log";
|
||||||
|
DataStreamSource<String> pageLogStrDS = env.addSource(
|
||||||
|
MyKafkaUtils.getKafkaSource(pageLogTopic, groupId)
|
||||||
|
);
|
||||||
|
|
||||||
|
String uniqueVisitTopic = "dwm_unique_visitor";
|
||||||
|
DataStreamSource<String> uniqueVisitStrDS = env.addSource(
|
||||||
|
MyKafkaUtils.getKafkaSource(uniqueVisitTopic, groupId)
|
||||||
|
);
|
||||||
|
|
||||||
|
String userJumpDetailDetailTopic = "dwm_user_jump_detail";
|
||||||
|
DataStreamSource<String> userJumpDetailDetailStrDS = env.addSource(
|
||||||
|
MyKafkaUtils.getKafkaSource(userJumpDetailDetailTopic, groupId)
|
||||||
|
);
|
||||||
|
|
||||||
|
// pageLogStrDS.print(">>>>");
|
||||||
|
// uniqueVisitStrDS.print("$$$$");
|
||||||
|
// userJumpDetailDetailStrDS.print("&&&&");
|
||||||
|
|
||||||
|
//TODO 4.对流中数据进行格式转换 封装流 string ->VisitorStats
|
||||||
|
|
||||||
|
//4.1 dwd_page_log流中的数据的转换
|
||||||
|
SingleOutputStreamOperator<VisitorStats> pvStateDS = pageLogStrDS.map(
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
"common": {
|
||||||
|
"ar": "440000",
|
||||||
|
"uid": "9",
|
||||||
|
"os": "Android 11.0",
|
||||||
|
"ch": "vivo",
|
||||||
|
"is_new": "1",
|
||||||
|
"md": "Xiaomi 9",
|
||||||
|
"mid": "mid_10",
|
||||||
|
"vc": "v2.1.132",
|
||||||
|
"ba": "Xiaomi"
|
||||||
|
},
|
||||||
|
"page": {
|
||||||
|
"page_id": "cart",
|
||||||
|
"during_time": 2832,
|
||||||
|
"last_page_id": "good_detail"
|
||||||
|
},
|
||||||
|
"ts": 1683724788000
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
new MapFunction<String, VisitorStats>() {
|
||||||
|
@Override
|
||||||
|
public VisitorStats map(String s) throws Exception {
|
||||||
|
JSONObject jsonObject = JSON.parseObject(s);
|
||||||
|
JSONObject commonJsonObj = jsonObject.getJSONObject("common");
|
||||||
|
JSONObject pageJsonObj = jsonObject.getJSONObject("page");
|
||||||
|
VisitorStats visitorStats = new VisitorStats(
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
commonJsonObj.getString("vc"),
|
||||||
|
commonJsonObj.getString("ch"),
|
||||||
|
commonJsonObj.getString("ar"),
|
||||||
|
commonJsonObj.getString("is_new"),
|
||||||
|
0L,
|
||||||
|
1L,
|
||||||
|
0L,
|
||||||
|
0L,
|
||||||
|
pageJsonObj.getLong("during_time"),
|
||||||
|
jsonObject.getLong("ts")
|
||||||
|
);
|
||||||
|
//判断是否为新的会话
|
||||||
|
String lastPageId = pageJsonObj.getString("last_page_id");
|
||||||
|
if (lastPageId == null || lastPageId.length() == 0) {
|
||||||
|
visitorStats.setSv_ct(1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
return visitorStats;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
//4.2 dwm_unique_visitor流中的数据的转换
|
||||||
|
SingleOutputStreamOperator<VisitorStats> uvStateDS = uniqueVisitStrDS.map(
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
"common": {
|
||||||
|
"ar": "440000",
|
||||||
|
"uid": "9",
|
||||||
|
"os": "Android 11.0",
|
||||||
|
"ch": "vivo",
|
||||||
|
"is_new": "1",
|
||||||
|
"md": "Xiaomi 9",
|
||||||
|
"mid": "mid_10",
|
||||||
|
"vc": "v2.1.132",
|
||||||
|
"ba": "Xiaomi"
|
||||||
|
},
|
||||||
|
"page": {
|
||||||
|
"page_id": "cart",
|
||||||
|
"during_time": 2832,
|
||||||
|
"last_page_id": "good_detail"
|
||||||
|
},
|
||||||
|
"ts": 1683724788000
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
new MapFunction<String, VisitorStats>() {
|
||||||
|
@Override
|
||||||
|
public VisitorStats map(String s) throws Exception {
|
||||||
|
JSONObject jsonObject = JSON.parseObject(s);
|
||||||
|
JSONObject commonJsonObj = jsonObject.getJSONObject("common");
|
||||||
|
JSONObject pageJsonObj = jsonObject.getJSONObject("page");
|
||||||
|
VisitorStats visitorStats = new VisitorStats(
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
commonJsonObj.getString("vc"),
|
||||||
|
commonJsonObj.getString("ch"),
|
||||||
|
commonJsonObj.getString("ar"),
|
||||||
|
commonJsonObj.getString("is_new"),
|
||||||
|
1L,
|
||||||
|
0L,
|
||||||
|
0L,
|
||||||
|
0L,
|
||||||
|
0L,
|
||||||
|
jsonObject.getLong("ts")
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
return visitorStats;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
//4.3 dwm_user_jump_detail流中的数据的转换
|
||||||
|
SingleOutputStreamOperator<VisitorStats> ujdStateDS = userJumpDetailDetailStrDS.map(
|
||||||
|
new MapFunction<String, VisitorStats>() {
|
||||||
|
@Override
|
||||||
|
public VisitorStats map(String s) throws Exception {
|
||||||
|
JSONObject jsonObject = JSON.parseObject(s);
|
||||||
|
JSONObject commonJsonObj = jsonObject.getJSONObject("common");
|
||||||
|
JSONObject pageJsonObj = jsonObject.getJSONObject("page");
|
||||||
|
VisitorStats visitorStats = new VisitorStats(
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
commonJsonObj.getString("vc"),
|
||||||
|
commonJsonObj.getString("ch"),
|
||||||
|
commonJsonObj.getString("ar"),
|
||||||
|
commonJsonObj.getString("is_new"),
|
||||||
|
0L,
|
||||||
|
0L,
|
||||||
|
0L,
|
||||||
|
1L,
|
||||||
|
0L,
|
||||||
|
jsonObject.getLong("ts")
|
||||||
|
);
|
||||||
|
//判断是否为新的会话
|
||||||
|
String lastPageId = pageJsonObj.getString("last_page_id");
|
||||||
|
if (lastPageId == null || lastPageId.length() == 0) {
|
||||||
|
visitorStats.setSv_ct(1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
return visitorStats;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
//TODO 5.将三条刘转换后的数据进行合并
|
||||||
|
DataStream<VisitorStats> unionDS = pvStateDS.union(uvStateDS, ujdStateDS);
|
||||||
|
|
||||||
|
// unionDS.print(">>>>>>");
|
||||||
|
|
||||||
|
|
||||||
|
//TODO 6.指定watermark以及提取事件时间字段
|
||||||
|
SingleOutputStreamOperator<VisitorStats> visitorStatWithWatermarkDS = unionDS
|
||||||
|
.assignTimestampsAndWatermarks(
|
||||||
|
WatermarkStrategy.<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(3))
|
||||||
|
.withTimestampAssigner(
|
||||||
|
new SerializableTimestampAssigner<VisitorStats>() {
|
||||||
|
@Override
|
||||||
|
public long extractTimestamp(VisitorStats visitorStats, long l) {
|
||||||
|
return visitorStats.getTs();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
//TODO 7.按照维度对流中的数据进行分组 维度有:版本,渠道,地区,新老访客 所以我们定义分组的key为Tuple4类型
|
||||||
|
KeyedStream<VisitorStats, Tuple4<String, String, String, String>> keyedDS = visitorStatWithWatermarkDS.keyBy(
|
||||||
|
new KeySelector<VisitorStats, Tuple4<String, String, String, String>>() {
|
||||||
|
@Override
|
||||||
|
public Tuple4<String, String, String, String> getKey(VisitorStats visitorStats) throws Exception {
|
||||||
|
return Tuple4.of(
|
||||||
|
visitorStats.getVc(),
|
||||||
|
visitorStats.getCh(),
|
||||||
|
visitorStats.getAr(),
|
||||||
|
visitorStats.getIs_new());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
//TODO 8.开窗 对分组有的数据 进行开窗计算 每个分组 独立窗口 ,分组之间互不影响
|
||||||
|
WindowedStream<VisitorStats, Tuple4<String, String, String, String>, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
|
||||||
|
|
||||||
|
//TODO 9.聚合计算 对窗口中的数据进行聚合计算
|
||||||
|
SingleOutputStreamOperator<VisitorStats> reduceDS = windowDS.reduce(
|
||||||
|
new ReduceFunction<VisitorStats>() {
|
||||||
|
@Override
|
||||||
|
public VisitorStats reduce(VisitorStats visitorStats1, VisitorStats visitorStats2) throws Exception {
|
||||||
|
//度量值两两聚合
|
||||||
|
visitorStats1.setPv_ct(visitorStats1.getPv_ct() + visitorStats2.getPv_ct());
|
||||||
|
visitorStats1.setUv_ct(visitorStats1.getUv_ct() + visitorStats2.getUv_ct());
|
||||||
|
visitorStats1.setUj_ct(visitorStats1.getUj_ct() + visitorStats2.getUj_ct());
|
||||||
|
visitorStats1.setSv_ct(visitorStats1.getSv_ct() + visitorStats2.getSv_ct());
|
||||||
|
visitorStats1.setDur_sum(visitorStats1.getDur_sum() + visitorStats2.getDur_sum());
|
||||||
|
return visitorStats1;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new ProcessWindowFunction<VisitorStats, VisitorStats, Tuple4<String, String, String, String>, TimeWindow>() {
|
||||||
|
@Override
|
||||||
|
public void process(Tuple4<String, String, String, String> tuple4, ProcessWindowFunction<VisitorStats, VisitorStats, Tuple4<String, String, String, String>, TimeWindow>.Context context, Iterable<VisitorStats> iterable, Collector<VisitorStats> collector) throws Exception {
|
||||||
|
//补全时间字段
|
||||||
|
long start = context.window().getStart();
|
||||||
|
long end = context.window().getEnd();
|
||||||
|
for (VisitorStats visitorStats : iterable) {
|
||||||
|
visitorStats.setStt(DateTimeUtils.toYMDHMS(new Date(start)));
|
||||||
|
visitorStats.setEdt(DateTimeUtils.toYMDHMS(new Date(end)));
|
||||||
|
visitorStats.setTs(System.currentTimeMillis());
|
||||||
|
//将处理之后的数据发送到下游
|
||||||
|
collector.collect(visitorStats);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
reduceDS.print(">>>>>");
|
||||||
|
|
||||||
|
//TODO 10.将聚合统计之后的数据写到clickhouse
|
||||||
|
DataStreamSink<VisitorStats> visitorStatsDataStreamSink = reduceDS.addSink(
|
||||||
|
JdbcSink.sink(
|
||||||
|
"",
|
||||||
|
new JdbcStatementBuilder<VisitorStats>() {
|
||||||
|
@Override
|
||||||
|
public void accept(PreparedStatement preparedStatement, VisitorStats visitorStats) throws SQLException {
|
||||||
|
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new JdbcExecutionOptions.Builder().build(),
|
||||||
|
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
env.execute();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,79 @@
|
||||||
|
package com.atguigu.gmall.realtime.app.func;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.atguigu.gmall.realtime.utils.DimUtils;
|
||||||
|
import com.atguigu.gmall.realtime.utils.ThreadPoolUtils;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.app.func
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 15:17
|
||||||
|
*@Description: TODO 维度异步关联
|
||||||
|
* 模板方法设计模式:
|
||||||
|
* 在父类中定义实现某一个功能的核心算法骨架,将具体的实现延迟的子类中实现
|
||||||
|
* 子类在不改变父类核心算法骨架的前提下,每一个子类都可以有自己的实现
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {
|
||||||
|
|
||||||
|
ExecutorService executorService;
|
||||||
|
private String tableName;
|
||||||
|
|
||||||
|
public DimAsyncFunction(String tableName) {
|
||||||
|
this.tableName = tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
//创建线程池对象
|
||||||
|
executorService = ThreadPoolUtils.getInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
//发送异步请求,完成维度关联
|
||||||
|
//通过创建多线程的方式 发送异步请求
|
||||||
|
//asyncInvoke每处理一条流中的数据 都会执行一次
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
|
||||||
|
//通过线程池对象获取线程
|
||||||
|
executorService.submit(
|
||||||
|
new Runnable() {
|
||||||
|
//TODO 在run中的代码就是异步维度关联的操作
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
|
//从对象获取维度关联的key
|
||||||
|
String key = getKey(input);
|
||||||
|
//根据key到维度表中获取维度对象
|
||||||
|
JSONObject dimJsonObj = DimUtils.getDimInfoWithCache(tableName, key);
|
||||||
|
//把维度对象的属性赋值给流中对象属性(维度关联)
|
||||||
|
if (dimJsonObj != null) {
|
||||||
|
join(input, dimJsonObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
System.out.println("维度异步查询耗时:" + (end - start) + "毫秒");
|
||||||
|
|
||||||
|
resultFuture.complete(Collections.singleton(input));//将input转换为一个集合向下游传递
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
System.out.println("维度异步查询发生异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.atguigu.gmall.realtime.app.func;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.app.func
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 16:24
|
||||||
|
*@Description: TODO
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
public interface DimJoinFunction<T> {
|
||||||
|
|
||||||
|
//怎么进行关联
|
||||||
|
void join(T input, JSONObject dimJsonObj) throws Exception;
|
||||||
|
|
||||||
|
//关联的id
|
||||||
|
String getKey(T input);
|
||||||
|
}
|
||||||
|
|
@ -16,10 +16,10 @@ import java.math.BigDecimal;
|
||||||
public class OrderDetail {
|
public class OrderDetail {
|
||||||
|
|
||||||
Long id;
|
Long id;
|
||||||
Long order_id ;
|
Long order_id;
|
||||||
Long sku_id;
|
Long sku_id;
|
||||||
BigDecimal order_price ;
|
BigDecimal order_price;
|
||||||
Long sku_num ;
|
Long sku_num;
|
||||||
String sku_name;
|
String sku_name;
|
||||||
String create_time;
|
String create_time;
|
||||||
BigDecimal split_total_amount;
|
BigDecimal split_total_amount;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
package com.atguigu.gmall.realtime.beans;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.beans
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 18:57
|
||||||
|
*@Description: TODO 订单信息实体类
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class PaymentInfo {
|
||||||
|
|
||||||
|
Long id;
|
||||||
|
Long order_id;
|
||||||
|
Long user_id;
|
||||||
|
BigDecimal total_amount;
|
||||||
|
String subject;
|
||||||
|
String payment_type;
|
||||||
|
String create_time;
|
||||||
|
String callback_time;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,95 @@
|
||||||
|
package com.atguigu.gmall.realtime.beans;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.apache.commons.beanutils.BeanUtils;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.beans
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 18:58
|
||||||
|
*@Description: TODO 订单宽表实体类
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class PaymentWide {
|
||||||
|
|
||||||
|
Long payment_id;
|
||||||
|
String subject;
|
||||||
|
String payment_type;
|
||||||
|
String payment_create_time;
|
||||||
|
String callback_time;
|
||||||
|
Long detail_id;
|
||||||
|
Long order_id;
|
||||||
|
Long sku_id;
|
||||||
|
BigDecimal order_price;
|
||||||
|
Long sku_num;
|
||||||
|
String sku_name;
|
||||||
|
Long province_id;
|
||||||
|
String order_status;
|
||||||
|
Long user_id;
|
||||||
|
BigDecimal total_amount;
|
||||||
|
BigDecimal activity_reduce_amount;
|
||||||
|
BigDecimal coupon_reduce_amount;
|
||||||
|
BigDecimal original_total_amount;
|
||||||
|
BigDecimal feight_fee;
|
||||||
|
BigDecimal split_feight_fee;
|
||||||
|
BigDecimal split_activity_amount;
|
||||||
|
BigDecimal split_coupon_amount;
|
||||||
|
BigDecimal split_total_amount;
|
||||||
|
String order_create_time;
|
||||||
|
|
||||||
|
String province_name;//查询维表得到
|
||||||
|
String province_area_code;
|
||||||
|
String province_iso_code;
|
||||||
|
String province_3166_2_code;
|
||||||
|
Integer user_age;
|
||||||
|
String user_gender;
|
||||||
|
|
||||||
|
Long spu_id; //作为维度数据 要关联进来
|
||||||
|
Long tm_id;
|
||||||
|
Long category3_id;
|
||||||
|
String spu_name;
|
||||||
|
String tm_name;
|
||||||
|
String category3_name;
|
||||||
|
|
||||||
|
public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide) {
|
||||||
|
mergeOrderWide(orderWide);
|
||||||
|
mergePaymentInfo(paymentInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void mergePaymentInfo(PaymentInfo paymentInfo) {
|
||||||
|
if (paymentInfo != null) {
|
||||||
|
try {
|
||||||
|
BeanUtils.copyProperties(this, paymentInfo);
|
||||||
|
payment_create_time = paymentInfo.create_time;
|
||||||
|
payment_id = paymentInfo.id;
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void mergeOrderWide(OrderWide orderWide) {
|
||||||
|
if (orderWide != null) {
|
||||||
|
try {
|
||||||
|
BeanUtils.copyProperties(this, orderWide);
|
||||||
|
order_create_time = orderWide.create_time;
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
package com.atguigu.gmall.realtime.beans;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.beans
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 21:11
|
||||||
|
*@Description: TODO 访客主题实体类 包含各个维度和度量
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class VisitorStats {
|
||||||
|
|
||||||
|
//统计开始时间
|
||||||
|
private String stt;
|
||||||
|
//统计结束时间
|
||||||
|
private String edt;
|
||||||
|
//维度:版本
|
||||||
|
private String vc;
|
||||||
|
//维度:渠道
|
||||||
|
private String ch;
|
||||||
|
//维度:地区
|
||||||
|
private String ar;
|
||||||
|
//维度:新老用户标识
|
||||||
|
private String is_new;
|
||||||
|
//度量:独立访客数
|
||||||
|
private Long uv_ct = 0L;
|
||||||
|
//度量:页面访问数
|
||||||
|
private Long pv_ct = 0L;
|
||||||
|
//度量: 进入次数
|
||||||
|
private Long sv_ct = 0L;
|
||||||
|
//度量: 跳出次数
|
||||||
|
private Long uj_ct = 0L;
|
||||||
|
//度量: 持续访问时间
|
||||||
|
private Long dur_sum = 0L;
|
||||||
|
//统计时间
|
||||||
|
private Long ts;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
package com.atguigu.gmall.realtime.utils;
|
||||||
|
|
||||||
|
|
||||||
|
import com.ctc.wstx.osgi.WstxBundleActivator;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.utils
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 19:49
|
||||||
|
*@Description: TODO 日期转换工具类
|
||||||
|
* SimpleDateFormat是线程不安全的
|
||||||
|
* 在JDK1.8之后,使用线程安全的DateTimeFormatter类替换
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
public class DateTimeUtils {
|
||||||
|
|
||||||
|
// private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
|
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||||
|
|
||||||
|
//将日期对象转换为字符串
|
||||||
|
public static String toYMDHMS(Date date) {
|
||||||
|
LocalDateTime localDateTime = LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
|
||||||
|
return dtf.format(localDateTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
//将字符串日期转换为时间毫秒数
|
||||||
|
public static Long toTs(String dateStr) {
|
||||||
|
//SimpleDateFormat: Date Calendar
|
||||||
|
//DateTimeFormatter: LocalDateTime instant
|
||||||
|
LocalDateTime localDateTime = LocalDateTime.parse(dateStr, dtf);
|
||||||
|
Long ts = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println(ZoneId.systemDefault());
|
||||||
|
System.out.println(new Date(1683728230000L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -114,10 +114,6 @@ public class DimUtils {
|
||||||
return dimInfoJsonObj;
|
return dimInfoJsonObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13")));
|
|
||||||
System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13"));
|
|
||||||
}
|
|
||||||
|
|
||||||
//根据redis的key,删除缓存
|
//根据redis的key,删除缓存
|
||||||
public static void deleteCached(String tableName, String id) {
|
public static void deleteCached(String tableName, String id) {
|
||||||
|
|
@ -132,4 +128,9 @@ public class DimUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13")));
|
||||||
|
System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
package com.atguigu.gmall.realtime.utils;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*@BelongsProject: rt-gmall-parent
|
||||||
|
*@BelongsPackage: com.atguigu.gmall.realtime.utils
|
||||||
|
*@Author: markilue
|
||||||
|
*@CreateTime: 2023-05-10 14:59
|
||||||
|
*@Description: TODO 线程池工具类
|
||||||
|
*@Version: 1.0
|
||||||
|
*/
|
||||||
|
public class ThreadPoolUtils {
|
||||||
|
|
||||||
|
public static volatile ThreadPoolExecutor threadPool;
|
||||||
|
|
||||||
|
public static final int corePoolSize = 4;
|
||||||
|
public static final int maximumPoolSize = 16;
|
||||||
|
public static final long keepAliveTime = 300;
|
||||||
|
public static final TimeUnit unit = TimeUnit.SECONDS;
|
||||||
|
public static final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
public static ThreadPoolExecutor getInstance() {
|
||||||
|
if (threadPool == null) {
|
||||||
|
synchronized (ThreadPoolExecutor.class) {
|
||||||
|
System.out.println("---开辟线程池---");
|
||||||
|
if (threadPool == null) {
|
||||||
|
threadPool = new ThreadPoolExecutor(
|
||||||
|
corePoolSize,
|
||||||
|
maximumPoolSize,
|
||||||
|
keepAliveTime,
|
||||||
|
unit,
|
||||||
|
workQueue
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return threadPool;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue