leecode,rt_phm更新更新

This commit is contained in:
markilue 2023-05-22 09:56:48 +08:00
parent dbb3733c1d
commit 3098bc28b3
16 changed files with 614 additions and 29 deletions

View File

@ -1,5 +1,6 @@
package com.cqu.warehouse.realtime.app.base; package com.cqu.warehouse.realtime.app.base;
import com.cqu.warehouse.realtime.common.PHMConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
@ -26,7 +27,7 @@ public abstract class BaseStreamApp {
env.getCheckpointConfig().setCheckpointTimeout(5000L); env.getCheckpointConfig().setCheckpointTimeout(5000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/phm_warehosue/rt/ck")); env.setStateBackend(new FsStateBackend(PHMConfig.RT_CHECKPOINT_LOCATION));
System.setProperty("HADOOP_USER_NAME", "dingjiawen"); System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//模板 //模板
execute(env); execute(env);

View File

@ -0,0 +1,58 @@
package com.cqu.warehouse.realtime.app.dwd;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.func.DimFunction;
import com.cqu.warehouse.realtime.app.func.DimSink;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dwd
*@Author: markilue
*@CreateTime: 2023-05-20 15:34
*@Description:
* TODO 读取数据库中的配置信息(维度信息):
* 1)读取经过maxwell采集的维度数据
* 2)通过指定的表插入对应的hbase表中
*@Version: 1.0
*/
public class BaseDBApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new BaseDBApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.从kafka中读取数据
String topic = "ods_base_db_m";
String groupId = "base_db_group";
DataStreamSource<String> kafkaDS = env.addSource(
MyKafkaUtils.getKafkaSource(topic, groupId)
);
//TODO 2.转为JSONObject
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSONObject::parseObject);
//TODO 3.判断是否需要在Hbase建表,并添加sink_table字段
SingleOutputStreamOperator<JSONObject> sinkDS = jsonObjectDS.map(
new DimFunction()
);
sinkDS.print(">>>");
//TODO 4.写到Hbase,由于可能需要写到不同的表中jdbcSink暂时只能写到一张表中,所以需要自己重写SinkFunction
sinkDS.addSink(
new DimSink()
);
}
}

View File

@ -21,7 +21,7 @@ import org.apache.flink.util.OutputTag;
*@Author: markilue *@Author: markilue
*@CreateTime: 2023-05-17 18:04 *@CreateTime: 2023-05-17 18:04
*@Description: *@Description:
* TODO phm的dwd层: * TODO phm的dwd层:读取上传的测试数据
* 1.简单ETL * 1.简单ETL
* 2.数据分流 * 2.数据分流
*@Version: 1.0 *@Version: 1.0

View File

@ -7,7 +7,7 @@ import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp; import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.app.func.DimAsyncFunction; import com.cqu.warehouse.realtime.app.func.DimAsyncFunction;
import com.cqu.warehouse.realtime.entity.GasTableProcess; import com.cqu.warehouse.realtime.entity.GasCodeProcess;
import com.cqu.warehouse.realtime.app.func.GasWideProcessFunction; import com.cqu.warehouse.realtime.app.func.GasWideProcessFunction;
import com.cqu.warehouse.realtime.app.func.MyDeserializerSchemaFunction; import com.cqu.warehouse.realtime.app.func.MyDeserializerSchemaFunction;
import com.cqu.warehouse.realtime.app.func.TableProcessFunction; import com.cqu.warehouse.realtime.app.func.TableProcessFunction;
@ -87,10 +87,10 @@ public class GasWideApp extends BaseStreamApp {
.build(); .build();
DataStreamSource<String> gasTableProcessDS = env.addSource(gasTableProcessSource); DataStreamSource<String> gasTableProcessDS = env.addSource(gasTableProcessSource);
// gasTableProcessDS.print(); // gasTableProcessDS.print("^^^");
//TODO 4.将配置流广播 //TODO 4.将配置流广播
MapStateDescriptor<String, GasTableProcess> gasTableProcessMapStateDescriptor = new MapStateDescriptor<String, GasTableProcess>("gasTableState", String.class, GasTableProcess.class); MapStateDescriptor<String, GasCodeProcess> gasTableProcessMapStateDescriptor = new MapStateDescriptor<String, GasCodeProcess>("gasTableState", String.class, GasCodeProcess.class);
BroadcastStream<String> gasBroadDS = gasTableProcessDS.broadcast(gasTableProcessMapStateDescriptor); BroadcastStream<String> gasBroadDS = gasTableProcessDS.broadcast(gasTableProcessMapStateDescriptor);
//TODO 5.原流join配置流并进行广播 //TODO 5.原流join配置流并进行广播
@ -133,20 +133,20 @@ public class GasWideApp extends BaseStreamApp {
gasWideDS.print("***"); gasWideDS.print("***");
//TODO 10.旁路缓存+异步IO 添加额外信息 ->类型维度(额定功率额定运行时长等),地区维度(经纬度等) //TODO 10.旁路缓存+异步IO 添加额外信息 ->类型维度(额定功率额定运行时长等),地区维度(经纬度等)
SingleOutputStreamOperator<JSONObject> gasWidedDS = AsyncDataStream.unorderedWait( // SingleOutputStreamOperator<JSONObject> gasWidedDS = AsyncDataStream.unorderedWait(
gasWideDS, // gasWideDS,
new DimAsyncFunction<JSONObject>(), // new DimAsyncFunction<JSONObject>(),
60, // 60,
TimeUnit.SECONDS // TimeUnit.SECONDS
); // );
//
SingleOutputStreamOperator<String> gasWideStrDS = gasWidedDS.map(jsonObject -> jsonObject.toJSONString()); // SingleOutputStreamOperator<String> gasWideStrDS = gasWidedDS.map(jsonObject -> jsonObject.toJSONString());
//
gasWideStrDS.print(">>>>"); // gasWideStrDS.print(">>>>");
//
gasWideStrDS.addSink( // gasWideStrDS.addSink(
MyKafkaUtils.getKafkaSink("dwm_gas_wide") // MyKafkaUtils.getKafkaSink("dwm_gas_wide")
); // );
} }

View File

@ -15,7 +15,6 @@ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
*/ */
public class DimAsyncFunction <T> extends RichAsyncFunction<T,T> { public class DimAsyncFunction <T> extends RichAsyncFunction<T,T> {
//异步调用主方法 //异步调用主方法
@Override @Override
public void asyncInvoke(T t, ResultFuture<T> resultFuture) throws Exception { public void asyncInvoke(T t, ResultFuture<T> resultFuture) throws Exception {

View File

@ -0,0 +1,95 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.common.PHMConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-20 18:18
*@Description: TODO 燃机维度信息添加
*@Version: 1.0
*/
public class DimFunction extends RichMapFunction<JSONObject, JSONObject> {
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER);
connection.setSchema(PHMConfig.HBASE_SCHEMA);
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
String dimTable = "dim_" + jsonObject.getString("table");
String type = jsonObject.getString("type");
//注意:maxwell可以对历史数据进行处理,这时候type为bootstrap-insert,这时候需要修复
if (type.equals("bootstrap-insert")) {
type = "insert";
jsonObject.put("type", type);
}
JSONObject data = jsonObject.getJSONObject("data");
Set<String> colName = data.keySet();
//如果说 读取到的配置信息是维度数据的话那么提前在Hbase中创建维度表
if ("insert".equals(type)) {
//如果是维度数据且是insert;update就没必要在创建
existTable(dimTable, colName);
}
jsonObject.put("sinkTable", dimTable);
return jsonObject;
}
private void existTable(String dimTable, Set<String> colName) {
StringBuilder sql = new StringBuilder("create table if not exists " + dimTable + "(");
int i = 0;
for (String col : colName) {
if ("id".equals(col)) {
sql.append(col + " varchar primary key");
} else {
sql.append(col + " varchar");
}
if (i++ < colName.size() - 1) {
sql.append(",");
}
}
sql.append(")");
PreparedStatement preparedStatement = null;
String executeSQL = sql.toString();
System.out.println(executeSQL);
try {
preparedStatement = connection.prepareStatement(executeSQL);
preparedStatement.execute();
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (preparedStatement == null) {
try {
preparedStatement.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
}
}

View File

@ -0,0 +1,83 @@
package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.common.PHMConfig;
import com.cqu.warehouse.realtime.utils.DimUtils;
import com.cqu.warehouse.realtime.utils.RedisUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.func
*@Author: markilue
*@CreateTime: 2023-05-20 19:09
*@Description: TODO 维度数据表写回Hbase
*@Version: 1.0
*/
public class DimSink extends RichSinkFunction<JSONObject> {
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER);
connection.setSchema(PHMConfig.HBASE_SCHEMA);
}
@Override
public void invoke(JSONObject jsonObject, Context context) {
String sinkTable = jsonObject.getString("sinkTable");
JSONObject data = jsonObject.getJSONObject("data");
String sql = generateSQL(sinkTable, data);
String id = data.getString("id");
try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){
preparedStatement.executeUpdate();
//TODO 注意:提交事务(MQSQL默认自动提交事务,Phoenix默认是手动提交事务)
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("向phoenix中插入维度数据失败");
}
String type = jsonObject.getString("sinkTable");
if("update".equals(type)||"delete".equals(type)){
DimUtils.deleteCache(sinkTable,id);
}
}
public String generateSQL(String sinkTable, JSONObject data) {
List<String> col = new ArrayList<>();
List<Object> value = new ArrayList<>();
for (Map.Entry<String, Object> entry : data.entrySet()) {
col.add(entry.getKey());
value.add(entry.getValue());
}
String sql = "upsert into " + sinkTable + "(" + StringUtils.join(col, ",") + ") values ('" + StringUtils.join(value, "','") + "')";
System.out.println(sql);
return sql;
}
}

View File

@ -2,7 +2,7 @@ package com.cqu.warehouse.realtime.app.func;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.entity.GasTableProcess; import com.cqu.warehouse.realtime.entity.GasCodeProcess;
import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
@ -23,7 +23,7 @@ import java.text.SimpleDateFormat;
*/ */
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> { public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private MapStateDescriptor<String, GasTableProcess> mapStateDescriptor; private MapStateDescriptor<String, GasCodeProcess> mapStateDescriptor;
private OutputTag<String> outTag; private OutputTag<String> outTag;
@ -34,7 +34,7 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
} }
public TableProcessFunction(OutputTag<String> outTag, MapStateDescriptor<String, GasTableProcess> mapStateDescriptor) { public TableProcessFunction(OutputTag<String> outTag, MapStateDescriptor<String, GasCodeProcess> mapStateDescriptor) {
this.mapStateDescriptor = mapStateDescriptor; this.mapStateDescriptor = mapStateDescriptor;
this.outTag = outTag; this.outTag = outTag;
} }
@ -42,17 +42,17 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
@Override @Override
public void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception { public void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {
ReadOnlyBroadcastState<String, GasTableProcess> gasTableState = readOnlyContext.getBroadcastState(mapStateDescriptor); ReadOnlyBroadcastState<String, GasCodeProcess> gasTableState = readOnlyContext.getBroadcastState(mapStateDescriptor);
String area = jsonObject.getString("area"); String area = jsonObject.getString("area");
String tagName = jsonObject.getString("tagName"); String tagName = jsonObject.getString("tagName");
GasTableProcess truth1 = gasTableState.get(area); GasCodeProcess truth1 = gasTableState.get(area);
if (truth1 != null) { if (truth1 != null) {
jsonObject.put("area", truth1.getTag_desc()); jsonObject.put("area", truth1.getTag_desc());
} }
GasTableProcess truth = gasTableState.get(tagName); GasCodeProcess truth = gasTableState.get(tagName);
if (truth != null) { if (truth != null) {
// jsonObject.put("originaltagName", truth.getDecode_value()); // jsonObject.put("originaltagName", truth.getDecode_value());
jsonObject.put("tagName", truth.getDecode_value()); jsonObject.put("tagName", truth.getDecode_value());
@ -73,12 +73,12 @@ public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, S
@Override @Override
public void processBroadcastElement(String s, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception { public void processBroadcastElement(String s, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {
BroadcastState<String, GasTableProcess> gasTableProcessState = context.getBroadcastState(mapStateDescriptor); BroadcastState<String, GasCodeProcess> gasTableProcessState = context.getBroadcastState(mapStateDescriptor);
JSONObject jsonObject = JSON.parseObject(s); JSONObject jsonObject = JSON.parseObject(s);
JSONObject data = jsonObject.getJSONObject("data"); JSONObject data = jsonObject.getJSONObject("data");
GasTableProcess gasTableProcess = JSON.parseObject(jsonObject.getString("data"), GasTableProcess.class); GasCodeProcess gasTableProcess = JSON.parseObject(jsonObject.getString("data"), GasCodeProcess.class);
if (data != null) { if (data != null) {
String encode_value = data.getString("encode_value"); String encode_value = data.getString("encode_value");
if (encode_value != null) { if (encode_value != null) {

View File

@ -0,0 +1,46 @@
create table if not exists dim_gas_type_info
(
rated_duration
varchar,
rated_load
varchar,
rated_power
varchar,
rated_speed
varchar,
rated_flow_rate
varchar,
type_id
varchar,
rated_press
varchar,
rated_temp
varchar,
id
varchar
primary
key
)
upsert into dim_gas_type_info
(
rated_duration,
rated_load,
rated_power,
rated_speed,
rated_flow_rate,
type_id,
rated_press,
rated_temp,
id
) values
(
rated_duration,
rated_load,
rated_power,
rated_speed,
rated_flow_rate,
type_id,
rated_press,
rated_temp,
id
)

View File

@ -13,4 +13,5 @@ public class PHMConfig {
public static final String HBASE_SCHEMA = "PHM_REALTIME";//Hbase库名 public static final String HBASE_SCHEMA = "PHM_REALTIME";//Hbase库名
public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接 public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//Phoenix连接 public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//Phoenix连接
public static final String RT_CHECKPOINT_LOCATION = "hdfs://Ding202:8020/phm_warehouse/rt/ck";//检查点保存位置
} }

View File

@ -11,7 +11,7 @@ import lombok.Data;
*@Version: 1.0 *@Version: 1.0
*/ */
@Data @Data
public class GasTableProcess { public class GasCodeProcess {
String encode_value;//加密数据 String encode_value;//加密数据
String decode_value;//解密数据 String decode_value;//解密数据

View File

@ -0,0 +1,127 @@
package com.cqu.warehouse.realtime.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import java.util.List;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-20 14:08
*@Description:
* TODO 维度关联工具类:
* 方便维度关联查询
*@Version: 1.0
*/
public class DimUtils {
public static JSONObject getDimInfoWithCache(String tableName, String id) {
return getDimInfoWithCache(tableName, Tuple2.of("id", id));
}
public static JSONObject getDimInfoWithCache(String tableName, Tuple2<String, String>... colNameAndValue) {
JSONObject jsonObject = null;
//TODO 查询缓存中是否存在这个对象
StringBuilder redisKey = new StringBuilder("dim:" + tableName + ":");
StringBuilder sql = new StringBuilder("select * from " + tableName + " where ");
for (int i = 0; i < colNameAndValue.length; i++) {
Tuple2<String, String> cv = colNameAndValue[i];
String colName = cv.f0;
String value = cv.f1;
sql.append("" + colName + " = '" + value + "'");
redisKey.append(value);
if (i < colNameAndValue.length - 1) {
sql.append(" and ");
redisKey.append("_");
}
}
Jedis jedis = null;
String cacheStr = null;
try {
jedis = RedisUtils.getJedisPool();
cacheStr = jedis.get(redisKey.toString());
} catch (Exception e) {
System.out.println("从redis查询出现异常");
}
if (cacheStr != null && cacheStr.length() > 0) {
//有缓存查缓存
jsonObject = JSON.parseObject(cacheStr);
} else {
//没有缓存查数据库
System.out.println(sql);
List<JSONObject> result = PhoenixUtils.selectData(sql.toString(), JSONObject.class);
if (result != null && result.size() > 0) {
jsonObject = result.get(0);
if (jedis != null) {
jedis.setex(redisKey.toString(), 1000 * 60 * 24, jsonObject.toJSONString());
}
} else {
System.out.println("维度数据没有找到" + sql);
}
}
//释放连接
if (jedis != null) {
jedis.close();
System.out.println("------关闭redis连接-------");
}
return jsonObject;
}
//从phoenix表中查询维度数据(封装成json) {"ID":"13","TM_NAME":"adsdf"}
//查询条件可能有多个(联合主键),用可变长的tuple来操作
public static JSONObject getDimInfoNocache(String tableName, Tuple2<String, String>... colNameAndValue) {
StringBuilder sql = new StringBuilder("select * from " + tableName + " where ");
for (int i = 0; i < colNameAndValue.length; i++) {
Tuple2<String, String> cv = colNameAndValue[i];
String colName = cv.f0;
String value = cv.f1;
sql.append("" + colName + " = '" + value + "'");
if (i < colNameAndValue.length - 1) {
sql.append(" and ");
}
}
System.out.println(sql);
List<JSONObject> result = PhoenixUtils.selectData(sql.toString(), JSONObject.class);
if (result != null && result.size() > 0) {
return result.get(0);
} else {
System.out.println("维度数据没有找到" + sql);
return null;
}
}
public static void deleteCache(String sinkTable, String id) {
//删除redis中的数据
String redisKey = "dim:" + sinkTable.toLowerCase() + ":" + id;
Jedis jedis = null;
try {
jedis = RedisUtils.getJedisPool();
jedis.del(redisKey);
jedis.close();//关闭jedis连接
} catch (Exception e) {
System.out.println("删除内存中的数据失败");
e.printStackTrace();
}
}
public static void main(String[] args) {
while (true) {
JSONObject object = getDimInfoWithCache("DIM_SKU_INFO", "10");
System.out.println(object);
}
}
}

View File

@ -1,5 +1,13 @@
package com.cqu.warehouse.realtime.utils; package com.cqu.warehouse.realtime.utils;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.common.PHMConfig;
import org.apache.commons.beanutils.BeanUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/** /**
*@BelongsProject: phm_parent *@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils *@BelongsPackage: com.cqu.warehouse.realtime.utils
@ -10,8 +18,76 @@ package com.cqu.warehouse.realtime.utils;
*/ */
public class PhoenixUtils { public class PhoenixUtils {
private static Connection connection;
public static <T> List<T> selectData(String sql, Class<T> clazz) {
//JDBC五部曲:
//注册驱动
if (connection == null) {
initConnection();
}
//获取连接
//查询结果集
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
List<T> result = new ArrayList<>();
try {
preparedStatement = connection.prepareStatement(sql);
resultSet = preparedStatement.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
//处理结果集
while (resultSet.next()) {
T obj = clazz.newInstance();
for (int i = 1; i < metaData.getColumnCount(); i++) {//从1开始
String columnName = metaData.getColumnName(i);
BeanUtils.setProperty(obj, columnName, resultSet.getObject(i));
}
result.add(obj);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//关闭连接
if(resultSet!=null){
try {
resultSet.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
return result;
}
public static void initConnection() {
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(PHMConfig.PHOENIX_SERVER);
connection.setSchema(PHMConfig.HBASE_SCHEMA);//设置操作的表空间
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
List<JSONObject> jsonObjects = selectData("select * from DIM_SKU_INFO", JSONObject.class);
System.out.println(jsonObjects);
// 如果报错java.sql.SQLException: ERROR 726 (43M10): Inconsistent namespace mapping properites.. Cannot initiate connection as SYSTEM:CATALOG is found but client does not have phoenix.schema.isNamespaceMappingEnabled
// 1)查看虚拟机,服务器中hbase-site.xml的配置包括hbase的和phoenix的
// 2)查看resource下面是否有hbase-site.xml检查配置
}
} }

View File

@ -0,0 +1,54 @@
package com.cqu.warehouse.realtime.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-20 14:25
*@Description: TODO redis类:获取Jedis连接池
*@Version: 1.0
*/
public class RedisUtils {
private static JedisPool jedisPool;
public static Jedis getJedisPool() {
if (jedisPool == null) {
initJedisPool();
}
System.out.println("获取redis连接");
return jedisPool.getResource();
}
private static void initJedisPool() {
//连接池配置对象
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//最大连接数
jedisPoolConfig.setMaxTotal(100);
//每次在连接的时候是否进行ping pong测试
jedisPoolConfig.setTestOnBorrow(true);
//连接耗尽是否等待
jedisPoolConfig.setBlockWhenExhausted(true);
//最大等待时间
jedisPoolConfig.setMaxWaitMillis(2000);
//最小空闲连接数
jedisPoolConfig.setMinIdle(5);
//最大空闲连接数(多了之后空闲了销毁之后最少还剩多少个)
jedisPoolConfig.setMaxIdle(5);
jedisPool = new JedisPool(jedisPoolConfig, "Ding202", 6379, 10000);
}
public static void main(String[] args) {
Jedis jedis = getJedisPool();
String pong = jedis.ping();
System.out.println(pong);
System.out.println(jedis.get("k1000"));
}
}

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://Ding202:8020/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>Ding202,Ding203,Ding204</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.wal.provider</name>
<value>filesystem</value>
</property>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,5 @@
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n