From 9eccd2abe23230ca826d1363f6a952e52cc35fdf Mon Sep 17 00:00:00 2001 From: markilue <745518019@qq.com> Date: Tue, 9 May 2023 21:56:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=95=B0=E4=BB=93=E4=BB=BB?= =?UTF-8?q?=E5=8A=A12=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rt-gmall-parent/gmall-realtime/pom.xml | 7 + .../gmall/realtime/app/dwm/OrderWideApp.java | 157 ++++++++++++++++++ .../realtime/app/dwm/UserJumpDetailApp.java | 86 ++++++++-- .../gmall/realtime/app/func/DimSink.java | 10 +- .../gmall/realtime/beans/OrderDetail.java | 30 ++++ .../gmall/realtime/beans/OrderInfo.java | 34 ++++ .../gmall/realtime/beans/OrderWide.java | 116 +++++++++++++ .../gmall/realtime/utils/DimUtils.java | 135 +++++++++++++++ .../gmall/realtime/utils/MyPhoenixUtils.java | 95 +++++++++-- .../gmall/realtime/utils/MyRedisUtils.java | 56 +++++++ 10 files changed, 703 insertions(+), 23 deletions(-) create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderInfo.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderWide.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java create mode 100644 Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyRedisUtils.java diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml index 151e239..f93ab7e 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/pom.xml @@ -134,6 +134,13 @@ + + + redis.clients + jedis + 3.3.0 + + diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java new file mode 100644 index 0000000..2635d5c --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java @@ -0,0 +1,157 @@ +package com.atguigu.gmall.realtime.app.dwm; + +import com.alibaba.fastjson.JSON; +import com.atguigu.gmall.realtime.beans.OrderDetail; +import com.atguigu.gmall.realtime.beans.OrderInfo; +import com.atguigu.gmall.realtime.beans.OrderWide; +import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +import java.text.SimpleDateFormat; +import java.time.Duration; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dwm + *@Author: markilue + *@CreateTime: 2023-05-09 14:33 + *@Description: TODO 订单宽表准备 + *@Version: 1.0 + */ +public class OrderWideApp { + + public static void main(String[] args) throws Exception { + //TODO 1.流环境建立 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + //TODO 2.检查点设置 + env.enableCheckpointing(5000L); + env.getCheckpointConfig().setCheckpointTimeout(3000L); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L)); + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + //TODO 3.从kafka中读取数据 + String orderInfoTopic = "dwd_order_info"; + String groupId = "order_wide_app_group"; + DataStreamSource orderInfoStrDS = env.addSource( + MyKafkaUtils.getKafkaSource(orderInfoTopic, groupId) + ); + + String orderDetailTopic = "dwd_order_detail"; + DataStreamSource orderDetailStrDS = env.addSource( + MyKafkaUtils.getKafkaSource(orderDetailTopic, groupId) + ); + + //TODO 4.对流中数据类型进行转换 string ->实体对象 + SingleOutputStreamOperator orderInfoDS = orderInfoStrDS.map( + new RichMapFunction() { + private SimpleDateFormat sdf; + + @Override + public void open(Configuration parameters) throws Exception { + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + } + + @Override + public OrderInfo map(String s) throws Exception { + OrderInfo orderInfo = JSON.parseObject(s, OrderInfo.class); + long create_ts = sdf.parse(orderInfo.getCreate_time()).getTime(); + orderInfo.setCreate_ts(create_ts); + return orderInfo; + } + } + ); + + SingleOutputStreamOperator orderDetailDS = orderDetailStrDS.map( + new RichMapFunction() { + private SimpleDateFormat sdf; + + @Override + public void open(Configuration parameters) throws Exception { + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + } + + @Override + public OrderDetail map(String s) throws Exception { + OrderDetail orderDetail = JSON.parseObject(s, OrderDetail.class); + long create_ts = sdf.parse(orderDetail.getCreate_time()).getTime(); + orderDetail.setCreate_ts(create_ts); + return orderDetail; + } + } + ); + +// orderInfoDS.print(">>>>"); +// orderDetailDS.print("####"); + + //TODO 5.指定watermark并提取事件时间字段 + //订单 + SingleOutputStreamOperator orderInfoWithWatermarkDS = orderInfoDS.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner( + new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(OrderInfo orderInfo, long l) { + return orderInfo.getCreate_ts(); + } + } + ) + ); + + //订单明细 + SingleOutputStreamOperator orderDetailWithWatermarkDS = orderDetailDS.assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) + .withTimestampAssigner( + new SerializableTimestampAssigner() { + @Override + public long extractTimestamp(OrderDetail orderDetail, long l) { + return orderDetail.getCreate_ts(); + } + } + ) + ); + + //TODO 6.指定两个流的关联字段 ------- order_id + KeyedStream orderInfoKeyedDS = orderInfoWithWatermarkDS.keyBy(OrderInfo::getId); + KeyedStream orderDetailKeyedDS = orderDetailWithWatermarkDS.keyBy(OrderDetail::getOrder_id); + + //TODO 7.双流join 使用intervalJoin + SingleOutputStreamOperator orderWideDS = orderInfoKeyedDS + .intervalJoin(orderDetailKeyedDS) + .between(Time.seconds(-5), Time.seconds(5)) + .process( + new ProcessJoinFunction() { + @Override + public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, ProcessJoinFunction.Context context, Collector collector) throws Exception { + collector.collect(new OrderWide(orderInfo, orderDetail)); + } + } + ); + + orderWideDS.print(">>>"); + + + + env.execute(); + + + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java index 4544f47..e77b441 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/UserJumpDetailApp.java @@ -8,16 +8,13 @@ import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.cep.CEP; -import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.*; import org.apache.flink.cep.functions.PatternProcessFunction; import org.apache.flink.cep.functions.TimedOutPartialMatchHandler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; @@ -57,6 +54,17 @@ public class UserJumpDetailApp { String groupId = "user_jump_detail_app_group"; DataStreamSource kafkaDS = env.addSource(MyKafkaUtils.getKafkaSource(topic, groupId)); +// DataStream kafkaDS = env +// .fromElements( +// "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ", +// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}", +// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" + +// "\"home\"},\"ts\":15000} ", +// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" + +// "\"detail\"},\"ts\":30000} " +// ); + + //TODO 4.map String ->JSONObject SingleOutputStreamOperator jsonDS = kafkaDS.map(JSON::parseObject); // jsonDS.print(">>>"); @@ -75,6 +83,8 @@ public class UserJumpDetailApp { ) ); + jsonObjWithWatermarkDS.print("###"); + //TODO 6.按照mid分组 KeyedStream keyedDS = jsonObjWithWatermarkDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid")); @@ -126,12 +136,65 @@ public class UserJumpDetailApp { PatternStream patternDS = CEP.pattern(keyedDS, pattern); //TODO 9.从流中提取数据 + //9.1 定义侧输出流标记 FlinkCEP会将超时数据匹配放到侧输出流中 + OutputTag timeoutTag = new OutputTag("timeout") { + }; - OutputTag outputTag = new OutputTag("lateData") {}; - patternDS.process( - new MyPatternProcessFunction(outputTag) + //9.2 数据提取 + //第一种方式 + // patternDS.process( +// new MyPatternProcessFunction(outputTag) +// ); + //第二种方式 --只能通过方法返回值传递 +// patternDS.select( +// timeoutTag, +// //处理超时数据 +// new PatternTimeoutFunction() { +// @Override +// public String timeout(Map> map, long l) throws Exception { +// return null; +// } +// }, +// //处理非超时数据 +// new PatternSelectFunction() { +// @Override +// public String select(Map> map) throws Exception { +// return null; +// } +// } +// ); + //第三种方式 --可以通过collector进行收集 + SingleOutputStreamOperator timeoutDS = patternDS.flatSelect( + timeoutTag, + //处理超时数据 + new PatternFlatTimeoutFunction() { + @Override + public void timeout(Map> map, long l, Collector collector) throws Exception { + //超时情况 就是我们要统计的跳出 + List jsonObjectList = map.get("first"); + for (JSONObject jsonObject : jsonObjectList) { + collector.collect(jsonObject.toJSONString()); + } + } + }, + //处理完全匹配数据 + new PatternFlatSelectFunction() { + @Override + public void flatSelect(Map> map, Collector collector) throws Exception { + //完全匹配的数据 是跳转情况,不在我们统计范围之内 + } + } ); + //9.3 从侧输出流中获取超时数据(跳出) + DataStream jumpDS = timeoutDS.getSideOutput(timeoutTag); + jumpDS.print(">>>>"); + + + //TODO 10.将跳出明细写到kafka的dwm层主题 + DataStreamSink dwm_user_jump_detailDS = jumpDS.addSink( + MyKafkaUtils.getKafkaSink("dwm_user_jump_detail") + ); env.execute(); @@ -156,8 +219,11 @@ class MyPatternProcessFunction extends PatternProcessFunction> match, Context ctx) throws Exception { - JSONObject startEvent = match.get("start").get(0); - ctx.output(outputTag, startEvent.toJSONString()); + List jsonObjectList = match.get("first"); + for (JSONObject jsonObject : jsonObjectList) { + ctx.output(outputTag, jsonObject.toJSONString()); + } + } } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java index 5397208..89bebb7 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/DimSink.java @@ -2,6 +2,7 @@ package com.atguigu.gmall.realtime.app.func; import com.alibaba.fastjson.JSONObject; import com.atguigu.gmall.realtime.common.GmallConfig; +import com.atguigu.gmall.realtime.utils.DimUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -55,6 +56,13 @@ public class DimSink extends RichSinkFunction { throw new RuntimeException("向phoenix维度表中插入数据失败"); } + //如果当前维度数据做的是删除或者修改数据,那么清空redis中缓存的维度数据 + if (jsonObject.getString("type").equals("update") || jsonObject.getString("type").equals("delete")) { + //那么清空Redis中缓存的维度数据 + DimUtils.deleteCached(tableName, data.getString("id")); + } + + } private String generateSQL(String tableName, JSONObject data) { @@ -66,8 +74,6 @@ public class DimSink extends RichSinkFunction { + "(" + key + ")" + " values('" + value + "')"; //upsert into GMALL_REALTIME.dim_base_trademark(tm_name,id) values('asdas','12') - - return sql; } } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java new file mode 100644 index 0000000..9d27442 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderDetail.java @@ -0,0 +1,30 @@ +package com.atguigu.gmall.realtime.beans; + +import lombok.Data; + +import java.math.BigDecimal; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.beans + *@Author: markilue + *@CreateTime: 2023-05-09 14:17 + *@Description: TODO 订单明细对象 + *@Version: 1.0 + */ +@Data +public class OrderDetail { + + Long id; + Long order_id ; + Long sku_id; + BigDecimal order_price ; + Long sku_num ; + String sku_name; + String create_time; + BigDecimal split_total_amount; + BigDecimal split_activity_amount; + BigDecimal split_coupon_amount; + Long create_ts; + +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderInfo.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderInfo.java new file mode 100644 index 0000000..972e116 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderInfo.java @@ -0,0 +1,34 @@ +package com.atguigu.gmall.realtime.beans; + +import lombok.Data; + +import java.math.BigDecimal; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.beans + *@Author: markilue + *@CreateTime: 2023-05-09 14:16 + *@Description: TODO 订单对象 + *@Version: 1.0 + */ +@Data +public class OrderInfo { + + Long id; + Long province_id; + String order_status; + Long user_id; + BigDecimal total_amount; + BigDecimal activity_reduce_amount; + BigDecimal coupon_reduce_amount; + BigDecimal original_total_amount; + BigDecimal feight_fee; + String expire_time; + String create_time; + String operate_time; + String create_date; // 把其他字段处理得到 + String create_hour; + Long create_ts; + +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderWide.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderWide.java new file mode 100644 index 0000000..df9e4e8 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/OrderWide.java @@ -0,0 +1,116 @@ +package com.atguigu.gmall.realtime.beans; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.ObjectUtils; + +import java.math.BigDecimal; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.beans + *@Author: markilue + *@CreateTime: 2023-05-09 15:35 + *@Description: TODO 订单宽表实体类 + *@Version: 1.0 + */ +@Data +@AllArgsConstructor +public class OrderWide { + + Long detail_id; + Long order_id; + Long sku_id; + BigDecimal order_price; + Long sku_num; + String sku_name; + Long province_id; + String order_status; + Long user_id; + + BigDecimal total_amount; + BigDecimal activity_reduce_amount; + BigDecimal coupon_reduce_amount; + BigDecimal original_total_amount; + BigDecimal feight_fee; + BigDecimal split_feight_fee; + BigDecimal split_activity_amount; + BigDecimal split_coupon_amount; + BigDecimal split_total_amount; + + String expire_time; + String create_time; + String operate_time; + String create_date; // 把其他字段处理得到 + String create_hour; + + String province_name;//查询维表得到 + String province_area_code; + String province_iso_code; + String province_3166_2_code; + + Integer user_age; + String user_gender; + + Long spu_id; //作为维度数据 要关联进来 + Long tm_id; + Long category3_id; + String spu_name; + String tm_name; + String category3_name; + + public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail) { + mergeOrderInfo(orderInfo); + mergeOrderDetail(orderDetail); + } + + public void mergeOrderInfo(OrderInfo orderInfo) { + if (orderInfo != null) { + this.order_id = orderInfo.id; + this.order_status = orderInfo.order_status; + this.create_time = orderInfo.create_time; + this.create_date = orderInfo.create_date; + this.activity_reduce_amount = orderInfo.activity_reduce_amount; + this.coupon_reduce_amount = orderInfo.coupon_reduce_amount; + this.original_total_amount = orderInfo.original_total_amount; + this.feight_fee = orderInfo.feight_fee; + this.total_amount = orderInfo.total_amount; + this.province_id = orderInfo.province_id; + this.user_id = orderInfo.user_id; + } + } + + public void mergeOrderDetail(OrderDetail orderDetail) { + if (orderDetail != null) { + this.detail_id = orderDetail.id; + this.sku_id = orderDetail.sku_id; + this.sku_name = orderDetail.sku_name; + this.order_price = orderDetail.order_price; + this.sku_num = orderDetail.sku_num; + this.split_activity_amount = orderDetail.split_activity_amount; + this.split_coupon_amount = orderDetail.split_coupon_amount; + this.split_total_amount = orderDetail.split_total_amount; + } + } + + public void mergeOtherOrderWide(OrderWide otherOrderWide) { + this.order_status = ObjectUtils.firstNonNull(this.order_status, otherOrderWide.order_status); + this.create_time = ObjectUtils.firstNonNull(this.create_time, otherOrderWide.create_time); + this.create_date = ObjectUtils.firstNonNull(this.create_date, otherOrderWide.create_date); + this.coupon_reduce_amount = ObjectUtils.firstNonNull(this.coupon_reduce_amount, otherOrderWide.coupon_reduce_amount); + this.activity_reduce_amount = ObjectUtils.firstNonNull(this.activity_reduce_amount, otherOrderWide.activity_reduce_amount); + this.original_total_amount = ObjectUtils.firstNonNull(this.original_total_amount, otherOrderWide.original_total_amount); + this.feight_fee = ObjectUtils.firstNonNull(this.feight_fee, otherOrderWide.feight_fee); + this.total_amount = ObjectUtils.firstNonNull(this.total_amount, otherOrderWide.total_amount); + this.user_id = ObjectUtils.firstNonNull(this.user_id, otherOrderWide.user_id); + this.sku_id = ObjectUtils.firstNonNull(this.sku_id, otherOrderWide.sku_id); + this.sku_name = ObjectUtils.firstNonNull(this.sku_name, otherOrderWide.sku_name); + this.order_price = ObjectUtils.firstNonNull(this.order_price, otherOrderWide.order_price); + this.sku_num = ObjectUtils.firstNonNull(this.sku_num, otherOrderWide.sku_num); + this.split_activity_amount = ObjectUtils.firstNonNull(this.split_activity_amount); + this.split_coupon_amount = ObjectUtils.firstNonNull(this.split_coupon_amount); + this.split_total_amount = ObjectUtils.firstNonNull(this.split_total_amount); + } + +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java new file mode 100644 index 0000000..fe21dd7 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/DimUtils.java @@ -0,0 +1,135 @@ +package com.atguigu.gmall.realtime.utils; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.apache.flink.api.java.tuple.Tuple2; +import redis.clients.jedis.Jedis; + +import java.util.List; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-09 17:47 + *@Description: TODO 查询维度数据工具类 + *@Version: 1.0 + */ +public class DimUtils { + + public static JSONObject getDimInfoWithCache(String tableName, String id) { + + return getDimInfoWithCache(tableName, Tuple2.of("id", id)); + } + + + //使用旁路缓存 对维度查询进行优化 + //redis缓存: type:string key: dim:表名:主键值1_主键值2 ttl:1天(失效时间) + public static JSONObject getDimInfoWithCache(String tableName, Tuple2... colNameAndValue) { + + //拼接查询Redis的key + + //拼接查询维度sql + StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where "); + StringBuilder redisKey = new StringBuilder("dim:" + tableName.toLowerCase() + ":"); + for (int i = 0; i < colNameAndValue.length; i++) { + String colName = colNameAndValue[i].f0; + String colValue = colNameAndValue[i].f1; + selectDimSql.append(colName + "='" + colValue + "'"); + redisKey.append(colValue); + if (i < colNameAndValue.length - 1) { + selectDimSql.append(" and "); + redisKey.append("_"); + } + } + //根据key到redis中查询redis缓存数据 + //声明操作Redis客户端 + Jedis jedis = null; + //声明变量 用于接受从Redis中查询出的缓存数据 + String jsonStr = null; + //声明变量 用于处理返回的维度对象 + JSONObject dimInfoJsonObj = null; + + try { + jedis = MyRedisUtils.getJedis(); + jsonStr = jedis.get(redisKey.toString()); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("从Redis中查询维度数据发生了异常");//catch让程序继续执行而不至于报错 + } + + //判断是否从Reids中获取到了维度缓存数据 + if (jsonStr != null && jsonStr.length() > 0) { + //从redis中查到了维度的缓存数据,将缓存的维度字符串转换为json对象 + dimInfoJsonObj = JSON.parseObject(jsonStr); + } else { + //缓存中没有查询到数据,到phoenix数据库中查询 + System.out.println("查询维度的sql:" + selectDimSql); + //底层还是调用封装的查询phoenix表的数据方法 + List resultList = MyPhoenixUtils.queryList(selectDimSql.toString(), JSONObject.class); + + if (resultList != null && resultList.size() > 0) { + dimInfoJsonObj = resultList.get(0);//因为是根据维度数据的主键查询,一般维度数据只有一条 + //将从Phoenix中查询到的维度数据放入redis缓存中 + if (jedis != null) { + jedis.setex(redisKey.toString(), 3600 * 24, dimInfoJsonObj.toJSONString());//设置超时时间 + } + } else { + System.out.println("维度数据没有找到" + selectDimSql); + } + } + + //释放连接 + if (jedis != null) { + jedis.close(); + System.out.println("------关闭redis连接-------"); + } + + return dimInfoJsonObj; + } + + //从phoenix表中查询维度数据(封装成json) {"ID":"13","TM_NAME":"adsdf"} + //查询条件可能有多个(联合主键),用可变长的tuple来操作 + public static JSONObject getDimInfoNocache(String tableName, Tuple2... colNameAndValue) { + + //拼接查询维度sql + StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where "); + for (int i = 0; i < colNameAndValue.length; i++) { + String colName = colNameAndValue[i].f0; + String colValue = colNameAndValue[i].f1; + selectDimSql.append(colName + "='" + colValue + "'"); + if (i < colNameAndValue.length - 1) { + selectDimSql.append(" and "); + } + } + System.out.println("查询维度的sql:" + selectDimSql); + //底层还是调用封装的查询phoenix表的数据方法 + List resultList = MyPhoenixUtils.queryList(selectDimSql.toString(), JSONObject.class); + JSONObject dimInfoJsonObj = null; + if (resultList != null && resultList.size() > 0) { + dimInfoJsonObj = resultList.get(0);//因为是根据维度数据的主键查询,一般维度数据只有一条 + } else { + System.out.println("维度数据没有找到" + selectDimSql); + } + return dimInfoJsonObj; + } + + public static void main(String[] args) { +// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13"))); + System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13")); + } + + //根据redis的key,删除缓存 + public static void deleteCached(String tableName, String id) { + String redisKey = "dim:" + tableName.toLowerCase() + ":" + id; + try { + Jedis jedis = MyRedisUtils.getJedis(); + jedis.del(redisKey); + jedis.close(); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("删除redis缓存发生了异常"); + } + + } +} diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java index 7778453..c7bf507 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyPhoenixUtils.java @@ -1,26 +1,99 @@ package com.atguigu.gmall.realtime.utils; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; +import com.alibaba.fastjson.JSONObject; +import com.atguigu.gmall.realtime.common.GmallConfig; +import org.apache.commons.beanutils.BeanUtils; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; /** *@BelongsProject: rt-gmall-parent *@BelongsPackage: com.atguigu.gmall.realtime.utils *@Author: markilue *@CreateTime: 2023-05-06 20:57 - *@Description: TODO 编写Phoenix工具类,执行sql + *@Description: + * TODO 编写Phoenix工具类,执行sql + * 从Phoenix表中查询数据 *@Version: 1.0 */ public class MyPhoenixUtils { - private static final String URL ="jdbc:phoenix:Ding202,Ding203,Ding204:2181"; + private static Connection connection; - public void executeSQL(String sql) throws Exception { - Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); - Connection connection = DriverManager.getConnection(URL); - PreparedStatement preparedStatement = connection.prepareStatement(sql); - preparedStatement.execute(); + + //执行查询SQL,将查询结果集封装T类型对象,放到List中 + public static List queryList(String sql, Class clazz) { + if (connection == null) { + //注册驱动 + initConnection(); + } + + List result = new ArrayList<>(); + //创建数据库操作对象 + PreparedStatement preparedStatement = null; + ResultSet resultSet = null; + try { + preparedStatement = connection.prepareStatement(sql); + //执行sql语句 + resultSet = preparedStatement.executeQuery(); + //获取查询结果集的元数据信息 + ResultSetMetaData metaData = resultSet.getMetaData(); + //处理结果集 + while (resultSet.next()) { + //创建要封装的对象 + T obj = clazz.newInstance(); + + for (int i = 1; i <= metaData.getColumnCount(); i++) {//注意:JDBC列的索引从1开始 + //根据元数据获取列名 + String columnName = metaData.getColumnName(i); + //给对象的属性赋值 + BeanUtils.setProperty(obj, columnName, resultSet.getObject(i)); + } + + //将封装的对象放到List集合中 + result.add(obj); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + //释放资源 + if (resultSet != null) { + try { + resultSet.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + if (preparedStatement != null) { + try { + preparedStatement.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + + + return result; + } + + private static void initConnection() { + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + //获取连接 + connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER); + connection.setSchema(GmallConfig.HBASE_SCHEMA);//设置操作的表空间 + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + public static void main(String[] args) { + List jsonObjects = queryList("select * from dim_base_trademark", JSONObject.class); + System.out.println(jsonObjects); } } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyRedisUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyRedisUtils.java new file mode 100644 index 0000000..e8728d9 --- /dev/null +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/MyRedisUtils.java @@ -0,0 +1,56 @@ +package com.atguigu.gmall.realtime.utils; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +/** + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-09 19:51 + *@Description: TODO 获取操作Redis的Java客户端Jedis + *@Version: 1.0 + */ +public class MyRedisUtils { + + //声明JedisPool连接池 + private static JedisPool jedisPool; + + public static Jedis getJedis() { + if (jedisPool == null) { + initJedisPool(); + } + System.out.println("-----获取Redis连接--------"); + return jedisPool.getResource(); + + } + + //初始化连接池对象 + private static void initJedisPool() { + //连接池配置对象 + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + //最大连接数 + jedisPoolConfig.setMaxTotal(100); + //每次在连接的时候是否进行ping pong测试 + jedisPoolConfig.setTestOnBorrow(true); + //连接耗尽是否等待 + jedisPoolConfig.setBlockWhenExhausted(true); + //最大等待时间 + jedisPoolConfig.setMaxWaitMillis(2000); + //最小空闲连接数 + jedisPoolConfig.setMinIdle(5); + //最大空闲连接数(多了之后,空闲了销毁之后最少还剩多少个) + jedisPoolConfig.setMaxIdle(5); + jedisPool = new JedisPool(jedisPoolConfig, "Ding202", 6379, 10000); + + } + + public static void main(String[] args) { + Jedis jedis = getJedis(); + String pong = jedis.ping(); + System.out.println(pong); + } + + +}