实时数仓任务2更新

This commit is contained in:
markilue 2023-05-09 21:56:06 +08:00
parent 4080e880b3
commit 9eccd2abe2
10 changed files with 703 additions and 23 deletions

View File

@ -134,6 +134,13 @@
</exclusions> </exclusions>
</dependency> </dependency>
<!--redis客户端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,157 @@
package com.atguigu.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.atguigu.gmall.realtime.beans.OrderDetail;
import com.atguigu.gmall.realtime.beans.OrderInfo;
import com.atguigu.gmall.realtime.beans.OrderWide;
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.time.Duration;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.app.dwm
*@Author: markilue
*@CreateTime: 2023-05-09 14:33
*@Description: TODO 订单宽表准备
*@Version: 1.0
*/
public class OrderWideApp {
public static void main(String[] args) throws Exception {
//TODO 1.流环境建立
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//TODO 2.检查点设置
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//TODO 3.从kafka中读取数据
String orderInfoTopic = "dwd_order_info";
String groupId = "order_wide_app_group";
DataStreamSource<String> orderInfoStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(orderInfoTopic, groupId)
);
String orderDetailTopic = "dwd_order_detail";
DataStreamSource<String> orderDetailStrDS = env.addSource(
MyKafkaUtils.getKafkaSource(orderDetailTopic, groupId)
);
//TODO 4.对流中数据类型进行转换 string ->实体对象
SingleOutputStreamOperator<OrderInfo> orderInfoDS = orderInfoStrDS.map(
new RichMapFunction<String, OrderInfo>() {
private SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
@Override
public OrderInfo map(String s) throws Exception {
OrderInfo orderInfo = JSON.parseObject(s, OrderInfo.class);
long create_ts = sdf.parse(orderInfo.getCreate_time()).getTime();
orderInfo.setCreate_ts(create_ts);
return orderInfo;
}
}
);
SingleOutputStreamOperator<OrderDetail> orderDetailDS = orderDetailStrDS.map(
new RichMapFunction<String, OrderDetail>() {
private SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
@Override
public OrderDetail map(String s) throws Exception {
OrderDetail orderDetail = JSON.parseObject(s, OrderDetail.class);
long create_ts = sdf.parse(orderDetail.getCreate_time()).getTime();
orderDetail.setCreate_ts(create_ts);
return orderDetail;
}
}
);
// orderInfoDS.print(">>>>");
// orderDetailDS.print("####");
//TODO 5.指定watermark并提取事件时间字段
//订单
SingleOutputStreamOperator<OrderInfo> orderInfoWithWatermarkDS = orderInfoDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo orderInfo, long l) {
return orderInfo.getCreate_ts();
}
}
)
);
//订单明细
SingleOutputStreamOperator<OrderDetail> orderDetailWithWatermarkDS = orderDetailDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<OrderDetail>() {
@Override
public long extractTimestamp(OrderDetail orderDetail, long l) {
return orderDetail.getCreate_ts();
}
}
)
);
//TODO 6.指定两个流的关联字段 ------- order_id
KeyedStream<OrderInfo, Long> orderInfoKeyedDS = orderInfoWithWatermarkDS.keyBy(OrderInfo::getId);
KeyedStream<OrderDetail, Long> orderDetailKeyedDS = orderDetailWithWatermarkDS.keyBy(OrderDetail::getOrder_id);
//TODO 7.双流join 使用intervalJoin
SingleOutputStreamOperator<OrderWide> orderWideDS = orderInfoKeyedDS
.intervalJoin(orderDetailKeyedDS)
.between(Time.seconds(-5), Time.seconds(5))
.process(
new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
@Override
public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>.Context context, Collector<OrderWide> collector) throws Exception {
collector.collect(new OrderWide(orderInfo, orderDetail));
}
}
);
orderWideDS.print(">>>");
env.execute();
}
}

View File

@ -8,16 +8,13 @@ import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cep.CEP; import org.apache.flink.cep.*;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction; import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler; import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
@ -57,6 +54,17 @@ public class UserJumpDetailApp {
String groupId = "user_jump_detail_app_group"; String groupId = "user_jump_detail_app_group";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtils.getKafkaSource(topic, groupId)); DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtils.getKafkaSource(topic, groupId));
// DataStream<String> kafkaDS = env
// .fromElements(
// "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
// "\"home\"},\"ts\":15000} ",
// "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
// "\"detail\"},\"ts\":30000} "
// );
//TODO 4.map String ->JSONObject //TODO 4.map String ->JSONObject
SingleOutputStreamOperator<JSONObject> jsonDS = kafkaDS.map(JSON::parseObject); SingleOutputStreamOperator<JSONObject> jsonDS = kafkaDS.map(JSON::parseObject);
// jsonDS.print(">>>"); // jsonDS.print(">>>");
@ -75,6 +83,8 @@ public class UserJumpDetailApp {
) )
); );
jsonObjWithWatermarkDS.print("###");
//TODO 6.按照mid分组 //TODO 6.按照mid分组
KeyedStream<JSONObject, String> keyedDS = jsonObjWithWatermarkDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid")); KeyedStream<JSONObject, String> keyedDS = jsonObjWithWatermarkDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
@ -126,12 +136,65 @@ public class UserJumpDetailApp {
PatternStream<JSONObject> patternDS = CEP.pattern(keyedDS, pattern); PatternStream<JSONObject> patternDS = CEP.pattern(keyedDS, pattern);
//TODO 9.从流中提取数据 //TODO 9.从流中提取数据
//9.1 定义侧输出流标记 FlinkCEP会将超时数据匹配放到侧输出流中
OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {
};
OutputTag<String> outputTag = new OutputTag<String>("lateData") {}; //9.2 数据提取
patternDS.process( //第一种方式
new MyPatternProcessFunction(outputTag) // patternDS.process(
// new MyPatternProcessFunction(outputTag)
// );
//第二种方式 --只能通过方法返回值传递
// patternDS.select(
// timeoutTag,
// //处理超时数据
// new PatternTimeoutFunction<JSONObject, String>() {
// @Override
// public String timeout(Map<String, List<JSONObject>> map, long l) throws Exception {
// return null;
// }
// },
// //处理非超时数据
// new PatternSelectFunction<JSONObject, String>() {
// @Override
// public String select(Map<String, List<JSONObject>> map) throws Exception {
// return null;
// }
// }
// );
//第三种方式 --可以通过collector进行收集
SingleOutputStreamOperator<String> timeoutDS = patternDS.flatSelect(
timeoutTag,
//处理超时数据
new PatternFlatTimeoutFunction<JSONObject, String>() {
@Override
public void timeout(Map<String, List<JSONObject>> map, long l, Collector<String> collector) throws Exception {
//超时情况 就是我们要统计的跳出
List<JSONObject> jsonObjectList = map.get("first");
for (JSONObject jsonObject : jsonObjectList) {
collector.collect(jsonObject.toJSONString());
}
}
},
//处理完全匹配数据
new PatternFlatSelectFunction<JSONObject, String>() {
@Override
public void flatSelect(Map<String, List<JSONObject>> map, Collector<String> collector) throws Exception {
//完全匹配的数据 是跳转情况不在我们统计范围之内
}
}
); );
//9.3 从侧输出流中获取超时数据(跳出)
DataStream<String> jumpDS = timeoutDS.getSideOutput(timeoutTag);
jumpDS.print(">>>>");
//TODO 10.将跳出明细写到kafka的dwm层主题
DataStreamSink<String> dwm_user_jump_detailDS = jumpDS.addSink(
MyKafkaUtils.getKafkaSink("dwm_user_jump_detail")
);
env.execute(); env.execute();
@ -156,8 +219,11 @@ class MyPatternProcessFunction extends PatternProcessFunction<JSONObject, JSONOb
@Override @Override
public void processTimedOutMatch(Map<String, List<JSONObject>> match, Context ctx) throws Exception { public void processTimedOutMatch(Map<String, List<JSONObject>> match, Context ctx) throws Exception {
JSONObject startEvent = match.get("start").get(0); List<JSONObject> jsonObjectList = match.get("first");
ctx.output(outputTag, startEvent.toJSONString()); for (JSONObject jsonObject : jsonObjectList) {
ctx.output(outputTag, jsonObject.toJSONString());
}
} }
} }

View File

@ -2,6 +2,7 @@ package com.atguigu.gmall.realtime.app.func;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.common.GmallConfig; import com.atguigu.gmall.realtime.common.GmallConfig;
import com.atguigu.gmall.realtime.utils.DimUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@ -55,6 +56,13 @@ public class DimSink extends RichSinkFunction<JSONObject> {
throw new RuntimeException("向phoenix维度表中插入数据失败"); throw new RuntimeException("向phoenix维度表中插入数据失败");
} }
//如果当前维度数据做的是删除或者修改数据,那么清空redis中缓存的维度数据
if (jsonObject.getString("type").equals("update") || jsonObject.getString("type").equals("delete")) {
//那么清空Redis中缓存的维度数据
DimUtils.deleteCached(tableName, data.getString("id"));
}
} }
private String generateSQL(String tableName, JSONObject data) { private String generateSQL(String tableName, JSONObject data) {
@ -66,8 +74,6 @@ public class DimSink extends RichSinkFunction<JSONObject> {
+ "(" + key + ")" + + "(" + key + ")" +
" values('" + value + "')"; " values('" + value + "')";
//upsert into GMALL_REALTIME.dim_base_trademark(tm_name,id) values('asdas','12') //upsert into GMALL_REALTIME.dim_base_trademark(tm_name,id) values('asdas','12')
return sql; return sql;
} }
} }

View File

@ -0,0 +1,30 @@
package com.atguigu.gmall.realtime.beans;
import lombok.Data;
import java.math.BigDecimal;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-09 14:17
*@Description: TODO 订单明细对象
*@Version: 1.0
*/
@Data
public class OrderDetail {
Long id;
Long order_id ;
Long sku_id;
BigDecimal order_price ;
Long sku_num ;
String sku_name;
String create_time;
BigDecimal split_total_amount;
BigDecimal split_activity_amount;
BigDecimal split_coupon_amount;
Long create_ts;
}

View File

@ -0,0 +1,34 @@
package com.atguigu.gmall.realtime.beans;
import lombok.Data;
import java.math.BigDecimal;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-09 14:16
*@Description: TODO 订单对象
*@Version: 1.0
*/
@Data
public class OrderInfo {
Long id;
Long province_id;
String order_status;
Long user_id;
BigDecimal total_amount;
BigDecimal activity_reduce_amount;
BigDecimal coupon_reduce_amount;
BigDecimal original_total_amount;
BigDecimal feight_fee;
String expire_time;
String create_time;
String operate_time;
String create_date; // 把其他字段处理得到
String create_hour;
Long create_ts;
}

View File

@ -0,0 +1,116 @@
package com.atguigu.gmall.realtime.beans;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.ObjectUtils;
import java.math.BigDecimal;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.beans
*@Author: markilue
*@CreateTime: 2023-05-09 15:35
*@Description: TODO 订单宽表实体类
*@Version: 1.0
*/
@Data
@AllArgsConstructor
public class OrderWide {
Long detail_id;
Long order_id;
Long sku_id;
BigDecimal order_price;
Long sku_num;
String sku_name;
Long province_id;
String order_status;
Long user_id;
BigDecimal total_amount;
BigDecimal activity_reduce_amount;
BigDecimal coupon_reduce_amount;
BigDecimal original_total_amount;
BigDecimal feight_fee;
BigDecimal split_feight_fee;
BigDecimal split_activity_amount;
BigDecimal split_coupon_amount;
BigDecimal split_total_amount;
String expire_time;
String create_time;
String operate_time;
String create_date; // 把其他字段处理得到
String create_hour;
String province_name;//查询维表得到
String province_area_code;
String province_iso_code;
String province_3166_2_code;
Integer user_age;
String user_gender;
Long spu_id; //作为维度数据 要关联进来
Long tm_id;
Long category3_id;
String spu_name;
String tm_name;
String category3_name;
public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail) {
mergeOrderInfo(orderInfo);
mergeOrderDetail(orderDetail);
}
public void mergeOrderInfo(OrderInfo orderInfo) {
if (orderInfo != null) {
this.order_id = orderInfo.id;
this.order_status = orderInfo.order_status;
this.create_time = orderInfo.create_time;
this.create_date = orderInfo.create_date;
this.activity_reduce_amount = orderInfo.activity_reduce_amount;
this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
this.original_total_amount = orderInfo.original_total_amount;
this.feight_fee = orderInfo.feight_fee;
this.total_amount = orderInfo.total_amount;
this.province_id = orderInfo.province_id;
this.user_id = orderInfo.user_id;
}
}
public void mergeOrderDetail(OrderDetail orderDetail) {
if (orderDetail != null) {
this.detail_id = orderDetail.id;
this.sku_id = orderDetail.sku_id;
this.sku_name = orderDetail.sku_name;
this.order_price = orderDetail.order_price;
this.sku_num = orderDetail.sku_num;
this.split_activity_amount = orderDetail.split_activity_amount;
this.split_coupon_amount = orderDetail.split_coupon_amount;
this.split_total_amount = orderDetail.split_total_amount;
}
}
public void mergeOtherOrderWide(OrderWide otherOrderWide) {
this.order_status = ObjectUtils.firstNonNull(this.order_status, otherOrderWide.order_status);
this.create_time = ObjectUtils.firstNonNull(this.create_time, otherOrderWide.create_time);
this.create_date = ObjectUtils.firstNonNull(this.create_date, otherOrderWide.create_date);
this.coupon_reduce_amount = ObjectUtils.firstNonNull(this.coupon_reduce_amount, otherOrderWide.coupon_reduce_amount);
this.activity_reduce_amount = ObjectUtils.firstNonNull(this.activity_reduce_amount, otherOrderWide.activity_reduce_amount);
this.original_total_amount = ObjectUtils.firstNonNull(this.original_total_amount, otherOrderWide.original_total_amount);
this.feight_fee = ObjectUtils.firstNonNull(this.feight_fee, otherOrderWide.feight_fee);
this.total_amount = ObjectUtils.firstNonNull(this.total_amount, otherOrderWide.total_amount);
this.user_id = ObjectUtils.<Long>firstNonNull(this.user_id, otherOrderWide.user_id);
this.sku_id = ObjectUtils.firstNonNull(this.sku_id, otherOrderWide.sku_id);
this.sku_name = ObjectUtils.firstNonNull(this.sku_name, otherOrderWide.sku_name);
this.order_price = ObjectUtils.firstNonNull(this.order_price, otherOrderWide.order_price);
this.sku_num = ObjectUtils.firstNonNull(this.sku_num, otherOrderWide.sku_num);
this.split_activity_amount = ObjectUtils.firstNonNull(this.split_activity_amount);
this.split_coupon_amount = ObjectUtils.firstNonNull(this.split_coupon_amount);
this.split_total_amount = ObjectUtils.firstNonNull(this.split_total_amount);
}
}

View File

@ -0,0 +1,135 @@
package com.atguigu.gmall.realtime.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-09 17:47
*@Description: TODO 查询维度数据工具类
*@Version: 1.0
*/
public class DimUtils {
public static JSONObject getDimInfoWithCache(String tableName, String id) {
return getDimInfoWithCache(tableName, Tuple2.of("id", id));
}
//使用旁路缓存 对维度查询进行优化
//redis缓存: type:string key: dim:表名:主键值1_主键值2 ttl:1天(失效时间)
public static JSONObject getDimInfoWithCache(String tableName, Tuple2<String, String>... colNameAndValue) {
//拼接查询Redis的key
//拼接查询维度sql
StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where ");
StringBuilder redisKey = new StringBuilder("dim:" + tableName.toLowerCase() + ":");
for (int i = 0; i < colNameAndValue.length; i++) {
String colName = colNameAndValue[i].f0;
String colValue = colNameAndValue[i].f1;
selectDimSql.append(colName + "='" + colValue + "'");
redisKey.append(colValue);
if (i < colNameAndValue.length - 1) {
selectDimSql.append(" and ");
redisKey.append("_");
}
}
//根据key到redis中查询redis缓存数据
//声明操作Redis客户端
Jedis jedis = null;
//声明变量 用于接受从Redis中查询出的缓存数据
String jsonStr = null;
//声明变量 用于处理返回的维度对象
JSONObject dimInfoJsonObj = null;
try {
jedis = MyRedisUtils.getJedis();
jsonStr = jedis.get(redisKey.toString());
} catch (Exception e) {
e.printStackTrace();
System.out.println("从Redis中查询维度数据发生了异常");//catch让程序继续执行而不至于报错
}
//判断是否从Reids中获取到了维度缓存数据
if (jsonStr != null && jsonStr.length() > 0) {
//从redis中查到了维度的缓存数据,将缓存的维度字符串转换为json对象
dimInfoJsonObj = JSON.parseObject(jsonStr);
} else {
//缓存中没有查询到数据,到phoenix数据库中查询
System.out.println("查询维度的sql:" + selectDimSql);
//底层还是调用封装的查询phoenix表的数据方法
List<JSONObject> resultList = MyPhoenixUtils.queryList(selectDimSql.toString(), JSONObject.class);
if (resultList != null && resultList.size() > 0) {
dimInfoJsonObj = resultList.get(0);//因为是根据维度数据的主键查询,一般维度数据只有一条
//将从Phoenix中查询到的维度数据放入redis缓存中
if (jedis != null) {
jedis.setex(redisKey.toString(), 3600 * 24, dimInfoJsonObj.toJSONString());//设置超时时间
}
} else {
System.out.println("维度数据没有找到" + selectDimSql);
}
}
//释放连接
if (jedis != null) {
jedis.close();
System.out.println("------关闭redis连接-------");
}
return dimInfoJsonObj;
}
//从phoenix表中查询维度数据(封装成json) {"ID":"13","TM_NAME":"adsdf"}
//查询条件可能有多个(联合主键),用可变长的tuple来操作
public static JSONObject getDimInfoNocache(String tableName, Tuple2<String, String>... colNameAndValue) {
//拼接查询维度sql
StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where ");
for (int i = 0; i < colNameAndValue.length; i++) {
String colName = colNameAndValue[i].f0;
String colValue = colNameAndValue[i].f1;
selectDimSql.append(colName + "='" + colValue + "'");
if (i < colNameAndValue.length - 1) {
selectDimSql.append(" and ");
}
}
System.out.println("查询维度的sql:" + selectDimSql);
//底层还是调用封装的查询phoenix表的数据方法
List<JSONObject> resultList = MyPhoenixUtils.queryList(selectDimSql.toString(), JSONObject.class);
JSONObject dimInfoJsonObj = null;
if (resultList != null && resultList.size() > 0) {
dimInfoJsonObj = resultList.get(0);//因为是根据维度数据的主键查询,一般维度数据只有一条
} else {
System.out.println("维度数据没有找到" + selectDimSql);
}
return dimInfoJsonObj;
}
public static void main(String[] args) {
// System.out.println(DimUtils.getDimInfoNocache("dim_base_trademark", Tuple2.of("id", "13")));
System.out.println(DimUtils.getDimInfoWithCache("dim_base_trademark", "13"));
}
//根据redis的key,删除缓存
public static void deleteCached(String tableName, String id) {
String redisKey = "dim:" + tableName.toLowerCase() + ":" + id;
try {
Jedis jedis = MyRedisUtils.getJedis();
jedis.del(redisKey);
jedis.close();
} catch (Exception e) {
e.printStackTrace();
System.out.println("删除redis缓存发生了异常");
}
}
}

View File

@ -1,26 +1,99 @@
package com.atguigu.gmall.realtime.utils; package com.atguigu.gmall.realtime.utils;
import java.sql.Connection; import com.alibaba.fastjson.JSONObject;
import java.sql.DriverManager; import com.atguigu.gmall.realtime.common.GmallConfig;
import java.sql.PreparedStatement; import org.apache.commons.beanutils.BeanUtils;
import java.sql.SQLException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/** /**
*@BelongsProject: rt-gmall-parent *@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.utils *@BelongsPackage: com.atguigu.gmall.realtime.utils
*@Author: markilue *@Author: markilue
*@CreateTime: 2023-05-06 20:57 *@CreateTime: 2023-05-06 20:57
*@Description: TODO 编写Phoenix工具类,执行sql *@Description:
* TODO 编写Phoenix工具类,执行sql
* 从Phoenix表中查询数据
*@Version: 1.0 *@Version: 1.0
*/ */
public class MyPhoenixUtils { public class MyPhoenixUtils {
private static final String URL ="jdbc:phoenix:Ding202,Ding203,Ding204:2181"; private static Connection connection;
public void executeSQL(String sql) throws Exception {
//执行查询SQL将查询结果集封装T类型对象放到List中
public static <T> List<T> queryList(String sql, Class<T> clazz) {
if (connection == null) {
//注册驱动
initConnection();
}
List<T> result = new ArrayList<>();
//创建数据库操作对象
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
try {
preparedStatement = connection.prepareStatement(sql);
//执行sql语句
resultSet = preparedStatement.executeQuery();
//获取查询结果集的元数据信息
ResultSetMetaData metaData = resultSet.getMetaData();
//处理结果集
while (resultSet.next()) {
//创建要封装的对象
T obj = clazz.newInstance();
for (int i = 1; i <= metaData.getColumnCount(); i++) {//注意:JDBC列的索引从1开始
//根据元数据获取列名
String columnName = metaData.getColumnName(i);
//给对象的属性赋值
BeanUtils.setProperty(obj, columnName, resultSet.getObject(i));
}
//将封装的对象放到List集合中
result.add(obj);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放资源
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return result;
}
private static void initConnection() {
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
Connection connection = DriverManager.getConnection(URL); //获取连接
PreparedStatement preparedStatement = connection.prepareStatement(sql); connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
preparedStatement.execute(); connection.setSchema(GmallConfig.HBASE_SCHEMA);//设置操作的表空间
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
List<JSONObject> jsonObjects = queryList("select * from dim_base_trademark", JSONObject.class);
System.out.println(jsonObjects);
} }
} }

View File

@ -0,0 +1,56 @@
package com.atguigu.gmall.realtime.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.gmall.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-09 19:51
*@Description: TODO 获取操作Redis的Java客户端Jedis
*@Version: 1.0
*/
public class MyRedisUtils {
//声明JedisPool连接池
private static JedisPool jedisPool;
public static Jedis getJedis() {
if (jedisPool == null) {
initJedisPool();
}
System.out.println("-----获取Redis连接--------");
return jedisPool.getResource();
}
//初始化连接池对象
private static void initJedisPool() {
//连接池配置对象
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//最大连接数
jedisPoolConfig.setMaxTotal(100);
//每次在连接的时候是否进行ping pong测试
jedisPoolConfig.setTestOnBorrow(true);
//连接耗尽是否等待
jedisPoolConfig.setBlockWhenExhausted(true);
//最大等待时间
jedisPoolConfig.setMaxWaitMillis(2000);
//最小空闲连接数
jedisPoolConfig.setMinIdle(5);
//最大空闲连接数(多了之后空闲了销毁之后最少还剩多少个)
jedisPoolConfig.setMaxIdle(5);
jedisPool = new JedisPool(jedisPoolConfig, "Ding202", 6379, 10000);
}
public static void main(String[] args) {
Jedis jedis = getJedis();
String pong = jedis.ping();
System.out.println(pong);
}
}