diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
index 151e239..f93ab7e 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml
@@ -134,6 +134,13 @@
+
+
+ redis.clients
+ jedis
+ 3.3.0
+
+
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
new file mode 100644
index 0000000..2635d5c
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java
@@ -0,0 +1,157 @@
+package com.atguigu.gmall.realtime.app.dwm;
+
+import com.alibaba.fastjson.JSON;
+import com.atguigu.gmall.realtime.beans.OrderDetail;
+import com.atguigu.gmall.realtime.beans.OrderInfo;
+import com.atguigu.gmall.realtime.beans.OrderWide;
+import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
+ *@Author: markilue
+ *@CreateTime: 2023-05-09 14:33
+ *@Description: TODO 订单宽表准备
+ *@Version: 1.0
+ */
+public class OrderWideApp {
+
+ public static void main(String[] args) throws Exception {
+ //TODO 1.流环境建立
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ //TODO 2.检查点设置
+ env.enableCheckpointing(5000L);
+ env.getCheckpointConfig().setCheckpointTimeout(3000L);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
+ env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
+ System.setProperty("HADOOP_USER_NAME", "dingjiawen");
+
+ //TODO 3.从kafka中读取数据
+ String orderInfoTopic = "dwd_order_info";
+ String groupId = "order_wide_app_group";
+ DataStreamSource orderInfoStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(orderInfoTopic, groupId)
+ );
+
+ String orderDetailTopic = "dwd_order_detail";
+ DataStreamSource orderDetailStrDS = env.addSource(
+ MyKafkaUtils.getKafkaSource(orderDetailTopic, groupId)
+ );
+
+ //TODO 4.对流中数据类型进行转换 string ->实体对象
+ SingleOutputStreamOperator orderInfoDS = orderInfoStrDS.map(
+ new RichMapFunction() {
+ private SimpleDateFormat sdf;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ }
+
+ @Override
+ public OrderInfo map(String s) throws Exception {
+ OrderInfo orderInfo = JSON.parseObject(s, OrderInfo.class);
+ long create_ts = sdf.parse(orderInfo.getCreate_time()).getTime();
+ orderInfo.setCreate_ts(create_ts);
+ return orderInfo;
+ }
+ }
+ );
+
+ SingleOutputStreamOperator orderDetailDS = orderDetailStrDS.map(
+ new RichMapFunction() {
+ private SimpleDateFormat sdf;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ }
+
+ @Override
+ public OrderDetail map(String s) throws Exception {
+ OrderDetail orderDetail = JSON.parseObject(s, OrderDetail.class);
+ long create_ts = sdf.parse(orderDetail.getCreate_time()).getTime();
+ orderDetail.setCreate_ts(create_ts);
+ return orderDetail;
+ }
+ }
+ );
+
+// orderInfoDS.print(">>>>");
+// orderDetailDS.print("####");
+
+ //TODO 5.指定watermark并提取事件时间字段
+ //订单
+ SingleOutputStreamOperator orderInfoWithWatermarkDS = orderInfoDS.assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
+ .withTimestampAssigner(
+ new SerializableTimestampAssigner() {
+ @Override
+ public long extractTimestamp(OrderInfo orderInfo, long l) {
+ return orderInfo.getCreate_ts();
+ }
+ }
+ )
+ );
+
+ //订单明细
+ SingleOutputStreamOperator orderDetailWithWatermarkDS = orderDetailDS.assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
+ .withTimestampAssigner(
+ new SerializableTimestampAssigner() {
+ @Override
+ public long extractTimestamp(OrderDetail orderDetail, long l) {
+ return orderDetail.getCreate_ts();
+ }
+ }
+ )
+ );
+
+ //TODO 6.指定两个流的关联字段 ------- order_id
+ KeyedStream orderInfoKeyedDS = orderInfoWithWatermarkDS.keyBy(OrderInfo::getId);
+ KeyedStream orderDetailKeyedDS = orderDetailWithWatermarkDS.keyBy(OrderDetail::getOrder_id);
+
+ //TODO 7.双流join 使用intervalJoin
+ SingleOutputStreamOperator orderWideDS = orderInfoKeyedDS
+ .intervalJoin(orderDetailKeyedDS)
+ .between(Time.seconds(-5), Time.seconds(5))
+ .process(
+ new ProcessJoinFunction() {
+ @Override
+ public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, ProcessJoinFunction.Context context, Collector collector) throws Exception {
+ collector.collect(new OrderWide(orderInfo, orderDetail));
+ }
+ }
+ );
+
+ orderWideDS.print(">>>");
+
+
+
+ env.execute();
+
+
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java
index 4544f47..e77b441 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java
@@ -8,16 +8,13 @@ import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.cep.CEP;
-import org.apache.flink.cep.PatternStream;
+import org.apache.flink.cep.*;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -57,6 +54,17 @@ public class UserJumpDetailApp {
String groupId = "user_jump_detail_app_group";
DataStreamSource kafkaDS = env.addSource(MyKafkaUtils.getKafkaSource(topic, groupId));
+// DataStream kafkaDS = env
+// .fromElements(
+// "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
+// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
+// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
+// "\"home\"},\"ts\":15000} ",
+// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
+// "\"detail\"},\"ts\":30000} "
+// );
+
+
//TODO 4.map String ->JSONObject
SingleOutputStreamOperator jsonDS = kafkaDS.map(JSON::parseObject);
// jsonDS.print(">>>");
@@ -75,6 +83,8 @@ public class UserJumpDetailApp {
)
);
+ jsonObjWithWatermarkDS.print("###");
+
//TODO 6.按照mid分组
KeyedStream keyedDS = jsonObjWithWatermarkDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
@@ -126,12 +136,65 @@ public class UserJumpDetailApp {
PatternStream patternDS = CEP.pattern(keyedDS, pattern);
//TODO 9.从流中提取数据
+ //9.1 定义侧输出流标记 FlinkCEP会将超时数据匹配放到侧输出流中
+ OutputTag timeoutTag = new OutputTag("timeout") {
+ };
- OutputTag outputTag = new OutputTag("lateData") {};
- patternDS.process(
- new MyPatternProcessFunction(outputTag)
+ //9.2 数据提取
+ //第一种方式
+ // patternDS.process(
+// new MyPatternProcessFunction(outputTag)
+// );
+ //第二种方式 --只能通过方法返回值传递
+// patternDS.select(
+// timeoutTag,
+// //处理超时数据
+// new PatternTimeoutFunction() {
+// @Override
+// public String timeout(Map> map, long l) throws Exception {
+// return null;
+// }
+// },
+// //处理非超时数据
+// new PatternSelectFunction() {
+// @Override
+// public String select(Map> map) throws Exception {
+// return null;
+// }
+// }
+// );
+ //第三种方式 --可以通过collector进行收集
+ SingleOutputStreamOperator timeoutDS = patternDS.flatSelect(
+ timeoutTag,
+ //处理超时数据
+ new PatternFlatTimeoutFunction() {
+ @Override
+ public void timeout(Map> map, long l, Collector collector) throws Exception {
+ //超时情况 就是我们要统计的跳出
+ List jsonObjectList = map.get("first");
+ for (JSONObject jsonObject : jsonObjectList) {
+ collector.collect(jsonObject.toJSONString());
+ }
+ }
+ },
+ //处理完全匹配数据
+ new PatternFlatSelectFunction() {
+ @Override
+ public void flatSelect(Map> map, Collector collector) throws Exception {
+ //完全匹配的数据 是跳转情况,不在我们统计范围之内
+ }
+ }
);
+ //9.3 从侧输出流中获取超时数据(跳出)
+ DataStream jumpDS = timeoutDS.getSideOutput(timeoutTag);
+ jumpDS.print(">>>>");
+
+
+ //TODO 10.将跳出明细写到kafka的dwm层主题
+ DataStreamSink dwm_user_jump_detailDS = jumpDS.addSink(
+ MyKafkaUtils.getKafkaSink("dwm_user_jump_detail")
+ );
env.execute();
@@ -156,8 +219,11 @@ class MyPatternProcessFunction extends PatternProcessFunction> match, Context ctx) throws Exception {
- JSONObject startEvent = match.get("start").get(0);
- ctx.output(outputTag, startEvent.toJSONString());
+ List jsonObjectList = match.get("first");
+ for (JSONObject jsonObject : jsonObjectList) {
+ ctx.output(outputTag, jsonObject.toJSONString());
+ }
+
}
}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java
index 5397208..89bebb7 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java
@@ -2,6 +2,7 @@ package com.atguigu.gmall.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.common.GmallConfig;
+import com.atguigu.gmall.realtime.utils.DimUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -55,6 +56,13 @@ public class DimSink extends RichSinkFunction {
throw new RuntimeException("向phoenix维度表中插入数据失败");
}
+ //如果当前维度数据做的是删除或者修改数据,那么清空redis中缓存的维度数据
+ if (jsonObject.getString("type").equals("update") || jsonObject.getString("type").equals("delete")) {
+ //那么清空Redis中缓存的维度数据
+ DimUtils.deleteCached(tableName, data.getString("id"));
+ }
+
+
}
private String generateSQL(String tableName, JSONObject data) {
@@ -66,8 +74,6 @@ public class DimSink extends RichSinkFunction {
+ "(" + key + ")" +
" values('" + value + "')";
//upsert into GMALL_REALTIME.dim_base_trademark(tm_name,id) values('asdas','12')
-
-
return sql;
}
}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java
new file mode 100644
index 0000000..9d27442
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java
@@ -0,0 +1,30 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-09 14:17
+ *@Description: TODO 订单明细对象
+ *@Version: 1.0
+ */
+@Data
+public class OrderDetail {
+
+ Long id;
+ Long order_id ;
+ Long sku_id;
+ BigDecimal order_price ;
+ Long sku_num ;
+ String sku_name;
+ String create_time;
+ BigDecimal split_total_amount;
+ BigDecimal split_activity_amount;
+ BigDecimal split_coupon_amount;
+ Long create_ts;
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderInfo.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderInfo.java
new file mode 100644
index 0000000..972e116
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderInfo.java
@@ -0,0 +1,34 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-09 14:16
+ *@Description: TODO 订单对象
+ *@Version: 1.0
+ */
+@Data
+public class OrderInfo {
+
+ Long id;
+ Long province_id;
+ String order_status;
+ Long user_id;
+ BigDecimal total_amount;
+ BigDecimal activity_reduce_amount;
+ BigDecimal coupon_reduce_amount;
+ BigDecimal original_total_amount;
+ BigDecimal feight_fee;
+ String expire_time;
+ String create_time;
+ String operate_time;
+ String create_date; // 把其他字段处理得到
+ String create_hour;
+ Long create_ts;
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderWide.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderWide.java
new file mode 100644
index 0000000..df9e4e8
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderWide.java
@@ -0,0 +1,116 @@
+package com.atguigu.gmall.realtime.beans;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+
+import java.math.BigDecimal;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.beans
+ *@Author: markilue
+ *@CreateTime: 2023-05-09 15:35
+ *@Description: TODO 订单宽表实体类
+ *@Version: 1.0
+ */
+@Data
+@AllArgsConstructor
+public class OrderWide {
+
+ Long detail_id;
+ Long order_id;
+ Long sku_id;
+ BigDecimal order_price;
+ Long sku_num;
+ String sku_name;
+ Long province_id;
+ String order_status;
+ Long user_id;
+
+ BigDecimal total_amount;
+ BigDecimal activity_reduce_amount;
+ BigDecimal coupon_reduce_amount;
+ BigDecimal original_total_amount;
+ BigDecimal feight_fee;
+ BigDecimal split_feight_fee;
+ BigDecimal split_activity_amount;
+ BigDecimal split_coupon_amount;
+ BigDecimal split_total_amount;
+
+ String expire_time;
+ String create_time;
+ String operate_time;
+ String create_date; // 把其他字段处理得到
+ String create_hour;
+
+ String province_name;//查询维表得到
+ String province_area_code;
+ String province_iso_code;
+ String province_3166_2_code;
+
+ Integer user_age;
+ String user_gender;
+
+ Long spu_id; //作为维度数据 要关联进来
+ Long tm_id;
+ Long category3_id;
+ String spu_name;
+ String tm_name;
+ String category3_name;
+
+ public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail) {
+ mergeOrderInfo(orderInfo);
+ mergeOrderDetail(orderDetail);
+ }
+
+ public void mergeOrderInfo(OrderInfo orderInfo) {
+ if (orderInfo != null) {
+ this.order_id = orderInfo.id;
+ this.order_status = orderInfo.order_status;
+ this.create_time = orderInfo.create_time;
+ this.create_date = orderInfo.create_date;
+ this.activity_reduce_amount = orderInfo.activity_reduce_amount;
+ this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
+ this.original_total_amount = orderInfo.original_total_amount;
+ this.feight_fee = orderInfo.feight_fee;
+ this.total_amount = orderInfo.total_amount;
+ this.province_id = orderInfo.province_id;
+ this.user_id = orderInfo.user_id;
+ }
+ }
+
+ public void mergeOrderDetail(OrderDetail orderDetail) {
+ if (orderDetail != null) {
+ this.detail_id = orderDetail.id;
+ this.sku_id = orderDetail.sku_id;
+ this.sku_name = orderDetail.sku_name;
+ this.order_price = orderDetail.order_price;
+ this.sku_num = orderDetail.sku_num;
+ this.split_activity_amount = orderDetail.split_activity_amount;
+ this.split_coupon_amount = orderDetail.split_coupon_amount;
+ this.split_total_amount = orderDetail.split_total_amount;
+ }
+ }
+
+ public void mergeOtherOrderWide(OrderWide otherOrderWide) {
+ this.order_status = ObjectUtils.firstNonNull(this.order_status, otherOrderWide.order_status);
+ this.create_time = ObjectUtils.firstNonNull(this.create_time, otherOrderWide.create_time);
+ this.create_date = ObjectUtils.firstNonNull(this.create_date, otherOrderWide.create_date);
+ this.coupon_reduce_amount = ObjectUtils.firstNonNull(this.coupon_reduce_amount, otherOrderWide.coupon_reduce_amount);
+ this.activity_reduce_amount = ObjectUtils.firstNonNull(this.activity_reduce_amount, otherOrderWide.activity_reduce_amount);
+ this.original_total_amount = ObjectUtils.firstNonNull(this.original_total_amount, otherOrderWide.original_total_amount);
+ this.feight_fee = ObjectUtils.firstNonNull(this.feight_fee, otherOrderWide.feight_fee);
+ this.total_amount = ObjectUtils.firstNonNull(this.total_amount, otherOrderWide.total_amount);
+ this.user_id = ObjectUtils.firstNonNull(this.user_id, otherOrderWide.user_id);
+ this.sku_id = ObjectUtils.firstNonNull(this.sku_id, otherOrderWide.sku_id);
+ this.sku_name = ObjectUtils.firstNonNull(this.sku_name, otherOrderWide.sku_name);
+ this.order_price = ObjectUtils.firstNonNull(this.order_price, otherOrderWide.order_price);
+ this.sku_num = ObjectUtils.firstNonNull(this.sku_num, otherOrderWide.sku_num);
+ this.split_activity_amount = ObjectUtils.firstNonNull(this.split_activity_amount);
+ this.split_coupon_amount = ObjectUtils.firstNonNull(this.split_coupon_amount);
+ this.split_total_amount = ObjectUtils.firstNonNull(this.split_total_amount);
+ }
+
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java
new file mode 100644
index 0000000..fe21dd7
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java
@@ -0,0 +1,135 @@
+package com.atguigu.gmall.realtime.utils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.flink.api.java.tuple.Tuple2;
+import redis.clients.jedis.Jedis;
+
+import java.util.List;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.utils
+ *@Author: markilue
+ *@CreateTime: 2023-05-09 17:47
+ *@Description: TODO 查询维度数据工具类
+ *@Version: 1.0
+ */
+public class DimUtils {
+
+ public static JSONObject getDimInfoWithCache(String tableName, String id) {
+
+ return getDimInfoWithCache(tableName, Tuple2.of("id", id));
+ }
+
+
+ //使用旁路缓存 对维度查询进行优化
+ //redis缓存: type:string key: dim:表名:主键值1_主键值2 ttl:1天(失效时间)
+ public static JSONObject getDimInfoWithCache(String tableName, Tuple2... colNameAndValue) {
+
+ //拼接查询Redis的key
+
+ //拼接查询维度sql
+ StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where ");
+ StringBuilder redisKey = new StringBuilder("dim:" + tableName.toLowerCase() + ":");
+ for (int i = 0; i < colNameAndValue.length; i++) {
+ String colName = colNameAndValue[i].f0;
+ String colValue = colNameAndValue[i].f1;
+ selectDimSql.append(colName + "='" + colValue + "'");
+ redisKey.append(colValue);
+ if (i < colNameAndValue.length - 1) {
+ selectDimSql.append(" and ");
+ redisKey.append("_");
+ }
+ }
+ //根据key到redis中查询redis缓存数据
+ //声明操作Redis客户端
+ Jedis jedis = null;
+ //声明变量 用于接受从Redis中查询出的缓存数据
+ String jsonStr = null;
+ //声明变量 用于处理返回的维度对象
+ JSONObject dimInfoJsonObj = null;
+
+ try {
+ jedis = MyRedisUtils.getJedis();
+ jsonStr = jedis.get(redisKey.toString());
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("从Redis中查询维度数据发生了异常");//catch让程序继续执行而不至于报错
+ }
+
+ //判断是否从Reids中获取到了维度缓存数据
+ if (jsonStr != null && jsonStr.length() > 0) {
+ //从redis中查到了维度的缓存数据,将缓存的维度字符串转换为json对象
+ dimInfoJsonObj = JSON.parseObject(jsonStr);
+ } else {
+ //缓存中没有查询到数据,到phoenix数据库中查询
+ System.out.println("查询维度的sql:" + selectDimSql);
+ //底层还是调用封装的查询phoenix表的数据方法
+ List resultList = MyPhoenixUtils.queryList(selectDimSql.toString(), JSONObject.class);
+
+ if (resultList != null && resultList.size() > 0) {
+ dimInfoJsonObj = resultList.get(0);//因为是根据维度数据的主键查询,一般维度数据只有一条
+ //将从Phoenix中查询到的维度数据放入redis缓存中
+ if (jedis != null) {
+ jedis.setex(redisKey.toString(), 3600 * 24, dimInfoJsonObj.toJSONString());//设置超时时间
+ }
+ } else {
+ System.out.println("维度数据没有找到" + selectDimSql);
+ }
+ }
+
+ //释放连接
+ if (jedis != null) {
+ jedis.close();
+ System.out.println("------关闭redis连接-------");
+ }
+
+ return dimInfoJsonObj;
+ }
+
+ //从phoenix表中查询维度数据(封装成json) {"ID":"13","TM_NAME":"adsdf"}
+ //查询条件可能有多个(联合主键),用可变长的tuple来操作
+ public static JSONObject getDimInfoNocache(String tableName, Tuple2... colNameAndValue) {
+
+ //拼接查询维度sql
+ StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where ");
+ for (int i = 0; i < colNameAndValue.length; i++) {
+ String colName = colNameAndValue[i].f0;
+ String colValue = colNameAndValue[i].f1;
+ selectDimSql.append(colName + "='" + colValue + "'");
+ if (i < colNameAndValue.length - 1) {
+ selectDimSql.append(" and ");
+ }
+ }
+ System.out.println("查询维度的sql:" + selectDimSql);
+ //底层还是调用封装的查询phoenix表的数据方法
+ List resultList = MyPhoenixUtils.queryList(selectDimSql.toString(), JSONObject.class);
+ JSONObject dimInfoJsonObj = null;
+ if (resultList != null && resultList.size() > 0) {
+ dimInfoJsonObj = resultList.get(0);//因为是根据维度数据的主键查询,一般维度数据只有一条
+ } else {
+ System.out.println("维度数据没有找到" + selectDimSql);
+ }
+ return dimInfoJsonObj;
+ }
+
+ public static void main(String[] args) {
+// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13")));
+ System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13"));
+ }
+
+ //根据redis的key,删除缓存
+ public static void deleteCached(String tableName, String id) {
+ String redisKey = "dim:" + tableName.toLowerCase() + ":" + id;
+ try {
+ Jedis jedis = MyRedisUtils.getJedis();
+ jedis.del(redisKey);
+ jedis.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("删除redis缓存发生了异常");
+ }
+
+ }
+}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java
index 7778453..c7bf507 100644
--- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java
@@ -1,26 +1,99 @@
package com.atguigu.gmall.realtime.utils;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
+import com.alibaba.fastjson.JSONObject;
+import com.atguigu.gmall.realtime.common.GmallConfig;
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-06 20:57
- *@Description: TODO 编写Phoenix工具类,执行sql
+ *@Description:
+ * TODO 编写Phoenix工具类,执行sql
+ * 从Phoenix表中查询数据
*@Version: 1.0
*/
public class MyPhoenixUtils {
- private static final String URL ="jdbc:phoenix:Ding202,Ding203,Ding204:2181";
+ private static Connection connection;
- public void executeSQL(String sql) throws Exception {
- Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
- Connection connection = DriverManager.getConnection(URL);
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
- preparedStatement.execute();
+
+ //执行查询SQL,将查询结果集封装T类型对象,放到List中
+ public static List queryList(String sql, Class clazz) {
+ if (connection == null) {
+ //注册驱动
+ initConnection();
+ }
+
+ List result = new ArrayList<>();
+ //创建数据库操作对象
+ PreparedStatement preparedStatement = null;
+ ResultSet resultSet = null;
+ try {
+ preparedStatement = connection.prepareStatement(sql);
+ //执行sql语句
+ resultSet = preparedStatement.executeQuery();
+ //获取查询结果集的元数据信息
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ //处理结果集
+ while (resultSet.next()) {
+ //创建要封装的对象
+ T obj = clazz.newInstance();
+
+ for (int i = 1; i <= metaData.getColumnCount(); i++) {//注意:JDBC列的索引从1开始
+ //根据元数据获取列名
+ String columnName = metaData.getColumnName(i);
+ //给对象的属性赋值
+ BeanUtils.setProperty(obj, columnName, resultSet.getObject(i));
+ }
+
+ //将封装的对象放到List集合中
+ result.add(obj);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ //释放资源
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ if (preparedStatement != null) {
+ try {
+ preparedStatement.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ return result;
+ }
+
+ private static void initConnection() {
+ try {
+ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+ //获取连接
+ connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
+ connection.setSchema(GmallConfig.HBASE_SCHEMA);//设置操作的表空间
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public static void main(String[] args) {
+ List jsonObjects = queryList("select * from dim_base_trademark", JSONObject.class);
+ System.out.println(jsonObjects);
}
}
diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyRedisUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyRedisUtils.java
new file mode 100644
index 0000000..e8728d9
--- /dev/null
+++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyRedisUtils.java
@@ -0,0 +1,56 @@
+package com.atguigu.gmall.realtime.utils;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+
+/**
+ *@BelongsProject: rt-gmall-parent
+ *@BelongsPackage: com.atguigu.gmall.realtime.utils
+ *@Author: markilue
+ *@CreateTime: 2023-05-09 19:51
+ *@Description: TODO 获取操作Redis的Java客户端Jedis
+ *@Version: 1.0
+ */
+public class MyRedisUtils {
+
+ //声明JedisPool连接池
+ private static JedisPool jedisPool;
+
+ public static Jedis getJedis() {
+ if (jedisPool == null) {
+ initJedisPool();
+ }
+ System.out.println("-----获取Redis连接--------");
+ return jedisPool.getResource();
+
+ }
+
+ //初始化连接池对象
+ private static void initJedisPool() {
+ //连接池配置对象
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ //最大连接数
+ jedisPoolConfig.setMaxTotal(100);
+ //每次在连接的时候是否进行ping pong测试
+ jedisPoolConfig.setTestOnBorrow(true);
+ //连接耗尽是否等待
+ jedisPoolConfig.setBlockWhenExhausted(true);
+ //最大等待时间
+ jedisPoolConfig.setMaxWaitMillis(2000);
+ //最小空闲连接数
+ jedisPoolConfig.setMinIdle(5);
+ //最大空闲连接数(多了之后,空闲了销毁之后最少还剩多少个)
+ jedisPoolConfig.setMaxIdle(5);
+ jedisPool = new JedisPool(jedisPoolConfig, "Ding202", 6379, 10000);
+
+ }
+
+ public static void main(String[] args) {
+ Jedis jedis = getJedis();
+ String pong = jedis.ping();
+ System.out.println(pong);
+ }
+
+
+}