diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/base/BaseStreamApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/base/BaseStreamApp.java index 539ce13..91c7ab1 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/base/BaseStreamApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/base/BaseStreamApp.java @@ -1,5 +1,6 @@ package com.cqu.warehouse.realtime.app.base; +import com.cqu.warehouse.realtime.common.PHMConfig; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; @@ -26,7 +27,7 @@ public abstract class BaseStreamApp { env.getCheckpointConfig().setCheckpointTimeout(5000L); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); - env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/phm_warehosue/rt/ck")); + env.setStateBackend(new FsStateBackend(PHMConfig.RT_CHECKPOINT_LOCATION)); System.setProperty("HADOOP_USER_NAME", "dingjiawen"); //模板 execute(env); diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDBApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDBApp.java new file mode 100644 index 0000000..9e66a9a --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDBApp.java @@ -0,0 +1,58 @@ +package com.cqu.warehouse.realtime.app.dwd; + +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.app.base.BaseStreamApp; +import com.cqu.warehouse.realtime.app.func.DimFunction; +import com.cqu.warehouse.realtime.app.func.DimSink; +import com.cqu.warehouse.realtime.utils.MyKafkaUtils; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.dwd + *@Author: markilue + *@CreateTime: 2023-05-20 15:34 + *@Description: + * TODO 读取数据库中的配置信息(维度信息): + * 1)读取经过maxwell采集的维度数据 + * 2)通过指定的表插入对应的hbase表中 + *@Version: 1.0 + */ +public class BaseDBApp extends BaseStreamApp { + + public static void main(String[] args) throws Exception { + new BaseDBApp().entry(); + } + + @Override + public void execute(StreamExecutionEnvironment env) { + + //TODO 1.从kafka中读取数据 + String topic = "ods_base_db_m"; + String groupId = "base_db_group"; + DataStreamSource kafkaDS = env.addSource( + MyKafkaUtils.getKafkaSource(topic, groupId) + ); + + //TODO 2.转为JSONObject + SingleOutputStreamOperator jsonObjectDS = kafkaDS.map(JSONObject::parseObject); + + //TODO 3.判断是否需要在Hbase建表,并添加sink_table字段 + SingleOutputStreamOperator sinkDS = jsonObjectDS.map( + new DimFunction() + ); + + sinkDS.print(">>>"); + //TODO 4.写到Hbase,由于可能需要写到不同的表中,jdbcSink暂时只能写到一张表中,所以需要自己重写SinkFunction + sinkDS.addSink( + new DimSink() + ); + + + + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDataApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDataApp.java index dc053da..cf28e22 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDataApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDataApp.java @@ -21,7 +21,7 @@ import org.apache.flink.util.OutputTag; *@Author: markilue *@CreateTime: 2023-05-17 18:04 *@Description: - * TODO phm的dwd层: + * TODO phm的dwd层:读取上传的测试数据 * 1.简单ETL * 2.数据分流 *@Version: 1.0 diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java index 5b9fa55..afa90f5 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java @@ -7,7 +7,7 @@ import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.cqu.warehouse.realtime.app.base.BaseStreamApp; import com.cqu.warehouse.realtime.app.func.DimAsyncFunction; -import com.cqu.warehouse.realtime.entity.GasTableProcess; +import com.cqu.warehouse.realtime.entity.GasCodeProcess; import com.cqu.warehouse.realtime.app.func.GasWideProcessFunction; import com.cqu.warehouse.realtime.app.func.MyDeserializerSchemaFunction; import com.cqu.warehouse.realtime.app.func.TableProcessFunction; @@ -87,10 +87,10 @@ public class GasWideApp extends BaseStreamApp { .build(); DataStreamSource gasTableProcessDS = env.addSource(gasTableProcessSource); -// gasTableProcessDS.print(); +// gasTableProcessDS.print("^^^"); //TODO 4.将配置流广播 - MapStateDescriptor gasTableProcessMapStateDescriptor = new MapStateDescriptor("gasTableState", String.class, GasTableProcess.class); + MapStateDescriptor gasTableProcessMapStateDescriptor = new MapStateDescriptor("gasTableState", String.class, GasCodeProcess.class); BroadcastStream gasBroadDS = gasTableProcessDS.broadcast(gasTableProcessMapStateDescriptor); //TODO 5.原流join配置流,并进行广播 @@ -133,20 +133,20 @@ public class GasWideApp extends BaseStreamApp { gasWideDS.print("***"); //TODO 10.旁路缓存+异步IO 添加额外信息 ->类型维度(额定功率,额定运行时长等),地区维度(经纬度等)等 - SingleOutputStreamOperator gasWidedDS = AsyncDataStream.unorderedWait( - gasWideDS, - new DimAsyncFunction(), - 60, - TimeUnit.SECONDS - ); - - SingleOutputStreamOperator gasWideStrDS = gasWidedDS.map(jsonObject -> jsonObject.toJSONString()); - - gasWideStrDS.print(">>>>"); - - gasWideStrDS.addSink( - MyKafkaUtils.getKafkaSink("dwm_gas_wide") - ); +// SingleOutputStreamOperator gasWidedDS = AsyncDataStream.unorderedWait( +// gasWideDS, +// new DimAsyncFunction(), +// 60, +// TimeUnit.SECONDS +// ); +// +// SingleOutputStreamOperator gasWideStrDS = gasWidedDS.map(jsonObject -> jsonObject.toJSONString()); +// +// gasWideStrDS.print(">>>>"); +// +// gasWideStrDS.addSink( +// MyKafkaUtils.getKafkaSink("dwm_gas_wide") +// ); } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java index d26649f..471eab8 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimAsyncFunction.java @@ -15,7 +15,6 @@ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; */ public class DimAsyncFunction extends RichAsyncFunction { - //异步调用主方法 @Override public void asyncInvoke(T t, ResultFuture resultFuture) throws Exception { diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimFunction.java new file mode 100644 index 0000000..994c5cd --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimFunction.java @@ -0,0 +1,95 @@ +package com.cqu.warehouse.realtime.app.func; + + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.common.PHMConfig; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Set; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-20 18:18 + *@Description: TODO 燃机维度信息添加 + *@Version: 1.0 + */ +public class DimFunction extends RichMapFunction { + + private Connection connection; + + @Override + public void open(Configuration parameters) throws Exception { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER); + connection.setSchema(PHMConfig.HBASE_SCHEMA); + } + + @Override + public JSONObject map(JSONObject jsonObject) throws Exception { + String dimTable = "dim_" + jsonObject.getString("table"); + String type = jsonObject.getString("type"); + //注意:maxwell可以对历史数据进行处理,这时候type为bootstrap-insert,这时候需要修复 + if (type.equals("bootstrap-insert")) { + type = "insert"; + jsonObject.put("type", type); + } + + JSONObject data = jsonObject.getJSONObject("data"); + Set colName = data.keySet(); + //如果说 读取到的配置信息是维度数据的话,那么提前在Hbase中创建维度表 + if ("insert".equals(type)) { + //如果是维度数据且是insert;update就没必要在创建 + existTable(dimTable, colName); + } + + jsonObject.put("sinkTable", dimTable); + return jsonObject; + } + + private void existTable(String dimTable, Set colName) { + StringBuilder sql = new StringBuilder("create table if not exists " + dimTable + "("); + int i = 0; + for (String col : colName) { + if ("id".equals(col)) { + sql.append(col + " varchar primary key"); + } else { + sql.append(col + " varchar"); + } + if (i++ < colName.size() - 1) { + sql.append(","); + } + } + sql.append(")"); + PreparedStatement preparedStatement = null; + String executeSQL = sql.toString(); + System.out.println(executeSQL); + try { + preparedStatement = connection.prepareStatement(executeSQL); + preparedStatement.execute(); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + if (preparedStatement == null) { + try { + preparedStatement.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + + } + +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimSink.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimSink.java new file mode 100644 index 0000000..9de8bb1 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/DimSink.java @@ -0,0 +1,83 @@ +package com.cqu.warehouse.realtime.app.func; + + +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.common.PHMConfig; +import com.cqu.warehouse.realtime.utils.DimUtils; +import com.cqu.warehouse.realtime.utils.RedisUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import redis.clients.jedis.Jedis; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-20 19:09 + *@Description: TODO 维度数据表写回Hbase + *@Version: 1.0 + */ +public class DimSink extends RichSinkFunction { + + private Connection connection; + + @Override + public void open(Configuration parameters) throws Exception { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER); + connection.setSchema(PHMConfig.HBASE_SCHEMA); + } + + @Override + public void invoke(JSONObject jsonObject, Context context) { + String sinkTable = jsonObject.getString("sinkTable"); + JSONObject data = jsonObject.getJSONObject("data"); + String sql = generateSQL(sinkTable, data); + String id = data.getString("id"); + + try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){ + + preparedStatement.executeUpdate(); + //TODO 注意:提交事务(MQSQL默认自动提交事务,Phoenix默认是手动提交事务) + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + throw new RuntimeException("向phoenix中插入维度数据失败"); + } + + + String type = jsonObject.getString("sinkTable"); + + if("update".equals(type)||"delete".equals(type)){ + DimUtils.deleteCache(sinkTable,id); + } + } + + public String generateSQL(String sinkTable, JSONObject data) { + + + List col = new ArrayList<>(); + List value = new ArrayList<>(); + + for (Map.Entry entry : data.entrySet()) { + col.add(entry.getKey()); + value.add(entry.getValue()); + } + + String sql = "upsert into " + sinkTable + "(" + StringUtils.join(col, ",") + ") values ('" + StringUtils.join(value, "','") + "')"; + System.out.println(sql); + + + return sql; + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/TableProcessFunction.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/TableProcessFunction.java index ead1e98..8f80db7 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/TableProcessFunction.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/TableProcessFunction.java @@ -2,7 +2,7 @@ package com.cqu.warehouse.realtime.app.func; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.cqu.warehouse.realtime.entity.GasTableProcess; +import com.cqu.warehouse.realtime.entity.GasCodeProcess; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat; */ public class TableProcessFunction extends BroadcastProcessFunction { - private MapStateDescriptor mapStateDescriptor; + private MapStateDescriptor mapStateDescriptor; private OutputTag outTag; @@ -34,7 +34,7 @@ public class TableProcessFunction extends BroadcastProcessFunction outTag, MapStateDescriptor mapStateDescriptor) { + public TableProcessFunction(OutputTag outTag, MapStateDescriptor mapStateDescriptor) { this.mapStateDescriptor = mapStateDescriptor; this.outTag = outTag; } @@ -42,17 +42,17 @@ public class TableProcessFunction extends BroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception { - ReadOnlyBroadcastState gasTableState = readOnlyContext.getBroadcastState(mapStateDescriptor); + ReadOnlyBroadcastState gasTableState = readOnlyContext.getBroadcastState(mapStateDescriptor); String area = jsonObject.getString("area"); String tagName = jsonObject.getString("tagName"); - GasTableProcess truth1 = gasTableState.get(area); + GasCodeProcess truth1 = gasTableState.get(area); if (truth1 != null) { jsonObject.put("area", truth1.getTag_desc()); } - GasTableProcess truth = gasTableState.get(tagName); + GasCodeProcess truth = gasTableState.get(tagName); if (truth != null) { // jsonObject.put("originaltagName", truth.getDecode_value()); jsonObject.put("tagName", truth.getDecode_value()); @@ -73,12 +73,12 @@ public class TableProcessFunction extends BroadcastProcessFunction.Context context, Collector collector) throws Exception { - BroadcastState gasTableProcessState = context.getBroadcastState(mapStateDescriptor); + BroadcastState gasTableProcessState = context.getBroadcastState(mapStateDescriptor); JSONObject jsonObject = JSON.parseObject(s); JSONObject data = jsonObject.getJSONObject("data"); - GasTableProcess gasTableProcess = JSON.parseObject(jsonObject.getString("data"), GasTableProcess.class); + GasCodeProcess gasTableProcess = JSON.parseObject(jsonObject.getString("data"), GasCodeProcess.class); if (data != null) { String encode_value = data.getString("encode_value"); if (encode_value != null) { diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/table.sql b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/table.sql new file mode 100644 index 0000000..b09f109 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/func/table.sql @@ -0,0 +1,46 @@ +create table if not exists dim_gas_type_info +( + rated_duration + varchar, + rated_load + varchar, + rated_power + varchar, + rated_speed + varchar, + rated_flow_rate + varchar, + type_id + varchar, + rated_press + varchar, + rated_temp + varchar, + id + varchar + primary + key +) + upsert into dim_gas_type_info +( + rated_duration, + rated_load, + rated_power, + rated_speed, + rated_flow_rate, + type_id, + rated_press, + rated_temp, + id +) values +( + rated_duration, + rated_load, + rated_power, + rated_speed, + rated_flow_rate, + type_id, + rated_press, + rated_temp, + id +) \ No newline at end of file diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/common/PHMConfig.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/common/PHMConfig.java index 7e50c15..9c5f937 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/common/PHMConfig.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/common/PHMConfig.java @@ -13,4 +13,5 @@ public class PHMConfig { public static final String HBASE_SCHEMA = "PHM_REALTIME";//Hbase库名 public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接 public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//Phoenix连接 + public static final String RT_CHECKPOINT_LOCATION = "hdfs://Ding202:8020/phm_warehouse/rt/ck";//检查点保存位置 } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTableProcess.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasCodeProcess.java similarity index 94% rename from phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTableProcess.java rename to phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasCodeProcess.java index bc4d2a4..314bb27 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasTableProcess.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/entity/GasCodeProcess.java @@ -11,7 +11,7 @@ import lombok.Data; *@Version: 1.0 */ @Data -public class GasTableProcess { +public class GasCodeProcess { String encode_value;//加密数据 String decode_value;//解密数据 diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/DimUtils.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/DimUtils.java new file mode 100644 index 0000000..54af614 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/DimUtils.java @@ -0,0 +1,127 @@ +package com.cqu.warehouse.realtime.utils; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.apache.flink.api.java.tuple.Tuple2; +import redis.clients.jedis.Jedis; + +import java.util.List; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-20 14:08 + *@Description: + * TODO 维度关联工具类: + * 方便维度关联查询 + *@Version: 1.0 + */ +public class DimUtils { + + public static JSONObject getDimInfoWithCache(String tableName, String id) { + return getDimInfoWithCache(tableName, Tuple2.of("id", id)); + } + + + public static JSONObject getDimInfoWithCache(String tableName, Tuple2... colNameAndValue) { + JSONObject jsonObject = null; + + //TODO 查询缓存中是否存在这个对象 + StringBuilder redisKey = new StringBuilder("dim:" + tableName + ":"); + StringBuilder sql = new StringBuilder("select * from " + tableName + " where "); + for (int i = 0; i < colNameAndValue.length; i++) { + Tuple2 cv = colNameAndValue[i]; + String colName = cv.f0; + String value = cv.f1; + sql.append("" + colName + " = '" + value + "'"); + redisKey.append(value); + if (i < colNameAndValue.length - 1) { + sql.append(" and "); + redisKey.append("_"); + } + } + Jedis jedis = null; + String cacheStr = null; + try { + jedis = RedisUtils.getJedisPool(); + cacheStr = jedis.get(redisKey.toString()); + } catch (Exception e) { + System.out.println("从redis查询出现异常"); + } + + if (cacheStr != null && cacheStr.length() > 0) { + //有缓存查缓存 + jsonObject = JSON.parseObject(cacheStr); + } else { + //没有缓存查数据库 + System.out.println(sql); + List result = PhoenixUtils.selectData(sql.toString(), JSONObject.class); + if (result != null && result.size() > 0) { + jsonObject = result.get(0); + if (jedis != null) { + jedis.setex(redisKey.toString(), 1000 * 60 * 24, jsonObject.toJSONString()); + } + } else { + System.out.println("维度数据没有找到" + sql); + } + } + + //释放连接 + if (jedis != null) { + jedis.close(); + System.out.println("------关闭redis连接-------"); + } + + + return jsonObject; + } + + //从phoenix表中查询维度数据(封装成json) {"ID":"13","TM_NAME":"adsdf"} + //查询条件可能有多个(联合主键),用可变长的tuple来操作 + public static JSONObject getDimInfoNocache(String tableName, Tuple2... colNameAndValue) { + + StringBuilder sql = new StringBuilder("select * from " + tableName + " where "); + for (int i = 0; i < colNameAndValue.length; i++) { + Tuple2 cv = colNameAndValue[i]; + String colName = cv.f0; + String value = cv.f1; + sql.append("" + colName + " = '" + value + "'"); + if (i < colNameAndValue.length - 1) { + sql.append(" and "); + } + } + System.out.println(sql); + List result = PhoenixUtils.selectData(sql.toString(), JSONObject.class); + if (result != null && result.size() > 0) { + return result.get(0); + } else { + System.out.println("维度数据没有找到" + sql); + return null; + } + + } + + + + public static void deleteCache(String sinkTable, String id) { + //删除redis中的数据 + String redisKey = "dim:" + sinkTable.toLowerCase() + ":" + id; + Jedis jedis = null; + try { + jedis = RedisUtils.getJedisPool(); + jedis.del(redisKey); + jedis.close();//关闭jedis连接 + } catch (Exception e) { + System.out.println("删除内存中的数据失败"); + e.printStackTrace(); + } + } + + public static void main(String[] args) { + while (true) { + JSONObject object = getDimInfoWithCache("DIM_SKU_INFO", "10"); + System.out.println(object); + } + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/PhoenixUtils.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/PhoenixUtils.java index 227b7cb..b2b1a5b 100644 --- a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/PhoenixUtils.java +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/PhoenixUtils.java @@ -1,5 +1,13 @@ package com.cqu.warehouse.realtime.utils; +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.common.PHMConfig; +import org.apache.commons.beanutils.BeanUtils; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + /** *@BelongsProject: phm_parent *@BelongsPackage: com.cqu.warehouse.realtime.utils @@ -10,8 +18,76 @@ package com.cqu.warehouse.realtime.utils; */ public class PhoenixUtils { + private static Connection connection; + + public static List selectData(String sql, Class clazz) { + + //JDBC五部曲: + //注册驱动 + if (connection == null) { + initConnection(); + } + //获取连接 + //查询结果集 + PreparedStatement preparedStatement = null; + ResultSet resultSet = null; + List result = new ArrayList<>(); + try { + preparedStatement = connection.prepareStatement(sql); + resultSet = preparedStatement.executeQuery(); + ResultSetMetaData metaData = resultSet.getMetaData(); + + //处理结果集 + while (resultSet.next()) { + T obj = clazz.newInstance(); + for (int i = 1; i < metaData.getColumnCount(); i++) {//从1开始 + String columnName = metaData.getColumnName(i); + BeanUtils.setProperty(obj, columnName, resultSet.getObject(i)); + } + result.add(obj); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + //关闭连接 + if(resultSet!=null){ + try { + resultSet.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + if (preparedStatement != null) { + try { + preparedStatement.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + return result; + } + public static void initConnection() { + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER); + connection.setSchema(PHMConfig.HBASE_SCHEMA);//设置操作的表空间 + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) { + List jsonObjects = selectData("select * from DIM_SKU_INFO", JSONObject.class); + System.out.println(jsonObjects); + // 如果报错java.sql.SQLException: ERROR 726 (43M10): Inconsistent namespace mapping properites.. Cannot initiate connection as SYSTEM:CATALOG is found but client does not have phoenix.schema.isNamespaceMappingEnabled + // 1)查看虚拟机,服务器中hbase-site.xml的配置,包括hbase的和phoenix的 + // 2)查看resource下面是否有hbase-site.xml,检查配置 + } } diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/RedisUtils.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/RedisUtils.java new file mode 100644 index 0000000..5bc00d1 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/RedisUtils.java @@ -0,0 +1,54 @@ +package com.cqu.warehouse.realtime.utils; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-20 14:25 + *@Description: TODO redis类:获取Jedis连接池 + *@Version: 1.0 + */ +public class RedisUtils { + + private static JedisPool jedisPool; + + + public static Jedis getJedisPool() { + if (jedisPool == null) { + initJedisPool(); + } + System.out.println("获取redis连接"); + return jedisPool.getResource(); + } + + private static void initJedisPool() { + //连接池配置对象 + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + //最大连接数 + jedisPoolConfig.setMaxTotal(100); + //每次在连接的时候是否进行ping pong测试 + jedisPoolConfig.setTestOnBorrow(true); + //连接耗尽是否等待 + jedisPoolConfig.setBlockWhenExhausted(true); + //最大等待时间 + jedisPoolConfig.setMaxWaitMillis(2000); + //最小空闲连接数 + jedisPoolConfig.setMinIdle(5); + //最大空闲连接数(多了之后,空闲了销毁之后最少还剩多少个) + jedisPoolConfig.setMaxIdle(5); + jedisPool = new JedisPool(jedisPoolConfig, "Ding202", 6379, 10000); + } + + public static void main(String[] args) { + Jedis jedis = getJedisPool(); + String pong = jedis.ping(); + System.out.println(pong); + System.out.println(jedis.get("k1000")); + } + + +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/resources/hbase-site.xml b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/resources/hbase-site.xml new file mode 100644 index 0000000..1b4ce1a --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/resources/hbase-site.xml @@ -0,0 +1,40 @@ + + + + + + hbase.rootdir + hdfs://Ding202:8020/hbase + + + + hbase.cluster.distributed + true + + + + hbase.zookeeper.quorum + Ding202,Ding203,Ding204 + + + + hbase.unsafe.stream.capability.enforce + false + + + + hbase.wal.provider + filesystem + + + + phoenix.schema.isNamespaceMappingEnabled + true + + + + phoenix.schema.mapSystemTablesToNamespace + true + + + diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/resources/log4j.properties b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/resources/log4j.properties new file mode 100644 index 0000000..963ca91 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=error,stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n