实时数仓任务ads更新
This commit is contained in:
parent
bb9062f2a5
commit
5619d60ed1
|
|
@ -26,7 +26,7 @@
|
|||
<appender-ref ref="console" />
|
||||
</logger>
|
||||
|
||||
<root level="error" additivity="false">
|
||||
<root level="info" additivity="false">
|
||||
<appender-ref ref="console" />
|
||||
</root>
|
||||
</configuration>
|
||||
|
|
|
|||
|
|
@ -1,11 +1,29 @@
|
|||
package com.atguigu.rtgmall.beans;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall.beans
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:29
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall.beans
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:29
|
||||
*@Description: TODO 地区交易额统计实体类
|
||||
*@Version: 1.0
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ProvinceStats {
|
||||
|
||||
private String stt;
|
||||
private String edt;
|
||||
private String province_id;
|
||||
private String province_name;
|
||||
private BigDecimal order_amount;
|
||||
private String ts;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
package com.atguigu.rtgmall.controller;
|
||||
|
||||
|
||||
import com.atguigu.rtgmall.beans.ProductStats;
|
||||
import com.atguigu.rtgmall.beans.ProvinceStats;
|
||||
import com.atguigu.rtgmall.service.ProductStatsService;
|
||||
import com.atguigu.rtgmall.service.ProvinceStatsService;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
|
@ -24,6 +27,9 @@ public class SugarController {
|
|||
@Autowired
|
||||
private ProductStatsService productStatsService;
|
||||
|
||||
@Autowired
|
||||
private ProvinceStatsService provinceStatsService;
|
||||
|
||||
|
||||
@RequestMapping("/gmv")
|
||||
public String getGMV(@RequestParam(value = "date", defaultValue = "0") Integer date) {
|
||||
|
|
@ -39,6 +45,168 @@ public class SugarController {
|
|||
return json;
|
||||
}
|
||||
|
||||
@RequestMapping("/tm")
|
||||
public String getProductStatsByTm(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) {
|
||||
if (date == 0) {
|
||||
date = now();
|
||||
}
|
||||
/*
|
||||
{
|
||||
"status": 0,
|
||||
"data": {
|
||||
"categories": ["苹果","三星", "华为","oppo","vivo","小米98"],
|
||||
"series": [{
|
||||
"name": "商品品牌",
|
||||
"data": [5101,6100,6371,5682,5430,9533]
|
||||
}]
|
||||
}
|
||||
}
|
||||
*/
|
||||
//调用service获取品牌交易额
|
||||
List<ProductStats> productStatsList = productStatsService.getProductStatsByTm(date, limit);
|
||||
|
||||
//对查询结果进行遍历,获取品牌以及品牌交易额,将品牌以及品牌交易额分别封装到独立的list集合中
|
||||
List<String> tmList = new ArrayList<>();
|
||||
List<BigDecimal> amountList = new ArrayList<>();
|
||||
for (ProductStats productStats : productStatsList) {
|
||||
tmList.add(productStats.getTm_name());
|
||||
amountList.add(productStats.getOrder_amount());
|
||||
}
|
||||
String json = "{" +
|
||||
" \"status\": 0," +
|
||||
" \"data\": {" +
|
||||
" \"categories\": [\"" + StringUtils.join(tmList, "\",\"") + "\"]," +
|
||||
" \"series\": [{" +
|
||||
" \"name\": \"商品品牌\"," +
|
||||
" \"data\": [" + StringUtils.join(amountList, ",") + "]" +
|
||||
" }]" +
|
||||
" }" +
|
||||
" }";
|
||||
return json;
|
||||
}
|
||||
|
||||
// @RequestMapping("/category3")
|
||||
// public String getProductStatsByCategory3(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) {
|
||||
// if (date == 0) {
|
||||
// date = now();
|
||||
// }
|
||||
// /*
|
||||
// {"status": 0,"data": [{"name": "PC","value": 97},{"name": "iOS","value": 50,}]}
|
||||
// */
|
||||
// //调用service获取品牌交易额
|
||||
// List<ProductStats> productStatsList = productStatsService.getProductStatsByCategory3(date, limit);
|
||||
//
|
||||
// //对查询结果进行遍历,获取品牌以及品牌交易额,将品牌以及品牌交易额分别封装到独立的list集合中
|
||||
// StringBuilder json = new StringBuilder("{\"status\": 0,\"data\": [");
|
||||
//
|
||||
// for (int i = 0; i < productStatsList.size(); i++) {
|
||||
// ProductStats productStats = productStatsList.get(i);
|
||||
// json.append("{\"name\": \"" + productStats.getCategory3_name() + "\",\"value\": " + productStats.getOrder_amount() + "}");
|
||||
// if (i < productStatsList.size() - 1) {
|
||||
// json.append(",");
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// json.append("]}");
|
||||
//
|
||||
// return json.toString();
|
||||
// }
|
||||
|
||||
|
||||
@RequestMapping("/category3")
|
||||
public Object getProductStatsByCategory3(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) {
|
||||
if (date == 0) {
|
||||
date = now();
|
||||
}
|
||||
/*
|
||||
{"status": 0,"data": [{"name": "PC","value": 97},{"name": "iOS","value": 50,}]}
|
||||
*/
|
||||
//调用service获取品牌交易额
|
||||
List<ProductStats> productStatsList = productStatsService.getProductStatsByCategory3(date, limit);
|
||||
HashMap resMap = new HashMap();
|
||||
List dateList = new ArrayList();
|
||||
for (ProductStats productStats : productStatsList) {
|
||||
HashMap dateMap = new HashMap();
|
||||
dateMap.put("name", productStats.getCategory3_name());
|
||||
dateMap.put("value", productStats.getOrder_amount());
|
||||
dateList.add(dateMap);
|
||||
}
|
||||
resMap.put("status", 0);
|
||||
resMap.put("data", dateList);
|
||||
|
||||
|
||||
return resMap;
|
||||
}
|
||||
|
||||
|
||||
@RequestMapping("/spu")
|
||||
public String getProductStatsBySPU(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) {
|
||||
if (date == 0) {
|
||||
date = now();
|
||||
}
|
||||
|
||||
//调用service获取品牌交易额
|
||||
List<ProductStats> productStatsList = productStatsService.getProductStatsBySPU(date, limit);
|
||||
|
||||
|
||||
//对查询结果进行遍历,获取品牌以及品牌交易额,将品牌以及品牌交易额分别封装到独立的list集合中
|
||||
StringBuilder json = new StringBuilder("{ " +
|
||||
" \"status\": 0, " +
|
||||
" \"data\": { " +
|
||||
" \"columns\": [ " +
|
||||
" { " +
|
||||
" \"name\": \"商品名称\", " +
|
||||
" \"id\": \"name\" " +
|
||||
" }, " +
|
||||
" { " +
|
||||
" \"name\": \"交易额\", " +
|
||||
" \"id\": \"amount\" " +
|
||||
" }, " +
|
||||
" { " +
|
||||
" \"name\": \"订单数\", " +
|
||||
" \"id\": \"ct\" " +
|
||||
" } " +
|
||||
" ], " +
|
||||
" \"rows\": [");
|
||||
|
||||
for (int i = 0; i < productStatsList.size(); i++) {
|
||||
ProductStats productStats = productStatsList.get(i);
|
||||
// System.out.println(productStats.getSpu_name()+":"+productStats.getOrder_amount()+":"+productStats.getOrder_ct());
|
||||
json.append("{\"name\": \"" + productStats.getSpu_name() + "\",\"amount\": " + productStats.getOrder_amount() + ",\"ct\": " + productStats.getOrder_ct() + "}");
|
||||
if (i < productStatsList.size() - 1) {
|
||||
json.append(",");
|
||||
}
|
||||
}
|
||||
|
||||
json.append("]}}");
|
||||
|
||||
return json.toString();
|
||||
}
|
||||
|
||||
|
||||
@RequestMapping("province")
|
||||
public String getProvinceStats(@RequestParam(value = "date", defaultValue = "0") Integer date) {
|
||||
if (date == 0) {
|
||||
date = now();
|
||||
}
|
||||
|
||||
List<ProvinceStats> provinceStatsList = provinceStatsService.getProvinceStats(date);
|
||||
|
||||
StringBuilder json = new StringBuilder("{\"status\": 0,\"data\": {\"mapData\": [");
|
||||
for (int i = 0; i < provinceStatsList.size(); i++) {
|
||||
ProvinceStats provinceStats = provinceStatsList.get(i);
|
||||
json.append("{\"name\": \"" + provinceStats.getProvince_name() + "\",\"value\": " + provinceStats.getOrder_amount() + "}");
|
||||
if (i < provinceStatsList.size() - 1) {
|
||||
json.append(",");
|
||||
}
|
||||
}
|
||||
|
||||
json.append("],\"valueName\": \"省份交易额\"}}");
|
||||
return json.toString();
|
||||
|
||||
}
|
||||
|
||||
|
||||
//获取当前日期
|
||||
private Integer now() {
|
||||
String yyyyMMdd = DateFormatUtils.format(new Date(), "yyyyMMdd");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,83 @@
|
|||
//横向柱状图
|
||||
{
|
||||
"status": 0,
|
||||
"data": {
|
||||
"categories": [
|
||||
"苹果",
|
||||
"小米",
|
||||
"华为",
|
||||
"Redmi",
|
||||
"索芙特"
|
||||
],
|
||||
"series": [
|
||||
{
|
||||
"name": "商品品牌",
|
||||
"data": [
|
||||
258910.00,
|
||||
153974.00,
|
||||
127240.00,
|
||||
24681.00,
|
||||
3439.00
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
//饼状图
|
||||
{
|
||||
"status": 0,
|
||||
"msg": "",
|
||||
"data": [
|
||||
{
|
||||
"name": "PC",
|
||||
"value": 97
|
||||
},
|
||||
{
|
||||
"name": "iOS",
|
||||
"value": 50
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
//轮播表格
|
||||
{
|
||||
"status": 0,
|
||||
"data": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "商品名称",
|
||||
"id": "name"
|
||||
},
|
||||
{
|
||||
"name": "交易额",
|
||||
"id": "amount"
|
||||
},
|
||||
{
|
||||
"name": "订单数",
|
||||
"id": "ct"
|
||||
}
|
||||
],
|
||||
"rows": [
|
||||
{
|
||||
"name": "北京总部",
|
||||
"amount": 1,
|
||||
"cr": 1
|
||||
},
|
||||
{
|
||||
"name": "北京总部",
|
||||
"amount": 1,
|
||||
"cr": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
//色彩地图
|
||||
{"status": 0,"data": {"mapData": [
|
||||
{"name": "北京","value": 8985,},
|
||||
{
|
||||
"name": "天津",
|
||||
"value": 8616,
|
||||
},
|
||||
],"valueName": "省份交易额",}}
|
||||
|
|
@ -26,4 +26,18 @@ public interface ProductStatsMapper {
|
|||
List<ProductStats> selectProductStatsByTm(@Param("date") Integer date, @Param("limit") Integer limit);
|
||||
|
||||
|
||||
@Select("select category3_id,category3_name,sum(order_amount) order_amount from product_stats " +
|
||||
"where toYYYYMMDD(stt)=#{date} " +
|
||||
"group by category3_id,category3_name " +
|
||||
"having order_amount>0 " +
|
||||
"order by order_amount desc limit #{limit} ")
|
||||
List<ProductStats> selectProductStatsByCategory3(@Param("date") Integer date, @Param("limit") Integer limit);
|
||||
|
||||
@Select("select spu_id, spu_name, sum(order_amount) order_amount, sum(order_ct) order_ct " +
|
||||
"from product_stats " +
|
||||
"where toYYYYMMDD(stt) = #{date} " +
|
||||
"group by spu_id, spu_name " +
|
||||
"having order_amount > 0 " +
|
||||
"order by order_amount desc, order_ct desc limit #{limit} ")
|
||||
List<ProductStats> selectProductStatsBySPU(@Param("date") Integer date, @Param("limit") Integer limit);
|
||||
}
|
||||
|
|
@ -1,11 +1,26 @@
|
|||
package com.atguigu.rtgmall.mapper;
|
||||
|
||||
import com.atguigu.rtgmall.beans.ProvinceStats;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall.mapper
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:29
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
public class ProvinceStatsMapper {
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall.mapper
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:29
|
||||
*@Description: TODO 地区统计Mapper
|
||||
*@Version: 1.0
|
||||
*/
|
||||
public interface ProvinceStatsMapper {
|
||||
|
||||
|
||||
@Select("select province_id,province_name,sum(order_amount) order_amount " +
|
||||
"from province_stats " +
|
||||
"where toYYYYMMDD(stt) = #{date} " +
|
||||
"group by province_id, province_name")
|
||||
List<ProvinceStats> selectProvinceStats(Integer date);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,37 @@
|
|||
|
||||
|
||||
-- 品牌销售类
|
||||
|
||||
select tm_id,tm_name,sum(order_amount) order_amount from product_stats
|
||||
where toYYYYMMDD(stt)=20230511
|
||||
group by tm_id,tm_name
|
||||
having order_amount>0
|
||||
order by order_amount desc limit 5;
|
||||
select tm_id, tm_name, sum(order_amount) order_amount
|
||||
from product_stats
|
||||
where toYYYYMMDD(stt) = 20230511
|
||||
group by tm_id, tm_name
|
||||
having order_amount > 0
|
||||
order by order_amount desc limit 5;
|
||||
|
||||
-- 品类销售额占比
|
||||
select category3_id, category3_name, sum(order_amount) order_amount
|
||||
from product_stats
|
||||
where toYYYYMMDD(stt) = 20230511
|
||||
group by category3_id, category3_name
|
||||
having order_amount > 0
|
||||
order by order_amount desc limit 5;
|
||||
|
||||
-- spu销售额
|
||||
select spu_id, spu_name, sum(order_amount) order_amount, sum(order_ct) order_ct
|
||||
from product_stats
|
||||
where toYYYYMMDD(stt) = 20230511
|
||||
group by spu_id, spu_name
|
||||
having order_amount > 0
|
||||
order by order_amount desc, order_ct desc limit 5;
|
||||
|
||||
select spu_id, spu_name, sum(order_amount) order_amount, sum(order_ct) order_ct
|
||||
from product_stats
|
||||
where toYYYYMMDD(stt) = 20230511
|
||||
group by spu_id, spu_name
|
||||
having order_amount > 0
|
||||
order by order_amount desc, order_ct desc limit 5
|
||||
|
||||
-- 省份交易额
|
||||
select province_id,province_name,sum(order_amount) order_amount
|
||||
from province_stats
|
||||
where toYYYYMMDD(stt) = 20230511
|
||||
group by province_id, province_name
|
||||
|
|
@ -18,5 +18,13 @@ public interface ProductStatsService {
|
|||
//获取品牌成交额
|
||||
List<ProductStats> getProductStatsByTm(Integer date, Integer limit);
|
||||
|
||||
List<ProductStats> getProductStatsByCategory3(Integer date, Integer limit);
|
||||
|
||||
//获取spu成交额
|
||||
List<ProductStats> getProductStatsBySPU(Integer date, Integer limit);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,9 @@
|
|||
package com.atguigu.rtgmall.service;
|
||||
|
||||
import com.atguigu.rtgmall.beans.ProvinceStats;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall.service
|
||||
|
|
@ -9,4 +13,7 @@ package com.atguigu.rtgmall.service;
|
|||
*@Version: 1.0
|
||||
*/
|
||||
public interface ProvinceStatsService {
|
||||
|
||||
//地区交易额
|
||||
List<ProvinceStats> getProvinceStats(Integer date);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,5 +30,15 @@ public class ProductStatsServiceImpl implements ProductStatsService {
|
|||
return productStatsMapper.selectProductStatsByTm(date, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ProductStats> getProductStatsByCategory3(Integer date, Integer limit) {
|
||||
return productStatsMapper.selectProductStatsByCategory3(date,limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ProductStats> getProductStatsBySPU(Integer date, Integer limit) {
|
||||
return productStatsMapper.selectProductStatsBySPU(date, limit);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,31 @@
|
|||
package com.atguigu.rtgmall.service.impl;
|
||||
|
||||
import com.atguigu.rtgmall.beans.ProvinceStats;
|
||||
import com.atguigu.rtgmall.mapper.ProvinceStatsMapper;
|
||||
import com.atguigu.rtgmall.service.ProvinceStatsService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall.service.impl
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:32
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
public class ProvinceStatsServiceImpl {
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall.service.impl
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:32
|
||||
*@Description: TODO 地区统计接口实现类
|
||||
*@Version: 1.0
|
||||
*/
|
||||
@Service
|
||||
public class ProvinceStatsServiceImpl implements ProvinceStatsService {
|
||||
|
||||
|
||||
@Autowired
|
||||
private ProvinceStatsMapper provinceStatsMapper;
|
||||
|
||||
|
||||
@Override
|
||||
public List<ProvinceStats> getProvinceStats(Integer date) {
|
||||
return provinceStatsMapper.selectProvinceStats(date);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,24 @@
|
|||
package com.atguigu.rtgmall;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:07
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.rtgmall
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 21:07
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
public class test1 {
|
||||
|
||||
public static void main(String[] args) {
|
||||
String date ="20230511";
|
||||
String limit ="5";
|
||||
|
||||
String sql = "select spu_id,spu_name,sum(order_amount) order_amount,sum(order_ct) order_ct from product_stats " +
|
||||
"where toYYYYMMDD(stt)="+date+" " +
|
||||
"group by spu_id,spu_name " +
|
||||
"having order_amount > 0 " +
|
||||
"order by order_amount desc,order_ct desc limit "+limit+" ";
|
||||
System.out.println(sql);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -211,9 +211,9 @@ public class OrderWideApp {
|
|||
new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
|
||||
@Override
|
||||
public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception {
|
||||
orderWide.setProvince_3166_2_code(dimJsonObj.getString("NAME"));
|
||||
orderWide.setProvince_area_code(dimJsonObj.getString("ISO_3166_2"));
|
||||
orderWide.setProvince_name(dimJsonObj.getString("AREA_CODE"));
|
||||
orderWide.setProvince_3166_2_code(dimJsonObj.getString("ISO_3166_2"));
|
||||
orderWide.setProvince_area_code(dimJsonObj.getString("AREA_CODE"));
|
||||
orderWide.setProvince_name(dimJsonObj.getString("NAME"));
|
||||
orderWide.setProvince_iso_code(dimJsonObj.getString("ISO_CODE"));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,11 +1,103 @@
|
|||
package com.atguigu.gmall.realtime.app.dws;
|
||||
|
||||
import com.atguigu.gmall.realtime.app.func.KeywordUDTF;
|
||||
import com.atguigu.gmall.realtime.beans.GmallConstant;
|
||||
import com.atguigu.gmall.realtime.beans.KeywordStats;
|
||||
import com.atguigu.gmall.realtime.utils.ClickhouseUtils;
|
||||
import com.atguigu.gmall.realtime.utils.MyKafkaUtils;
|
||||
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.table.api.EnvironmentSettings;
|
||||
import org.apache.flink.table.api.Table;
|
||||
import org.apache.flink.table.api.TableResult;
|
||||
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.gmall.realtime.app.dws
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 14:27
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.gmall.realtime.app.dws
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 14:27
|
||||
*@Description: TODO 关键词统计DWS
|
||||
*@Version: 1.0
|
||||
*/
|
||||
public class KeywordStatsApp {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
//TODO 1.基本环境准备
|
||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
|
||||
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
|
||||
env.setParallelism(4);
|
||||
|
||||
//TODO 2.检查点设置
|
||||
env.enableCheckpointing(5000L);
|
||||
env.getCheckpointConfig().setCheckpointTimeout(5000L);
|
||||
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
||||
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
|
||||
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall"));
|
||||
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
|
||||
|
||||
//TODO 2.k注册自定义的UDTF函数
|
||||
tableEnv.createTemporarySystemFunction("ik_analyze", KeywordUDTF.class);
|
||||
|
||||
|
||||
//TODO 3.从kafka中读取数据
|
||||
String topic = "dwd_page_log";
|
||||
String groupId = "keyword_stats_app_group";
|
||||
String createSQl = "create table page_view" +
|
||||
"(" +
|
||||
" common MAP<STRING, STRING>," +
|
||||
" page MAP<STRING, STRING>," +
|
||||
" ts BIGINT," +
|
||||
" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
|
||||
" WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND" +
|
||||
")" +
|
||||
" WITH (" + MyKafkaUtils.getKafkaDDL(topic, groupId) + ")";
|
||||
tableEnv.executeSql(createSQl);
|
||||
|
||||
//TODO 4.将动态表中表示搜索行为的记录过滤出来
|
||||
String filterSQL = "select " +
|
||||
" page['item'] fullword,rowtime " +
|
||||
"from" +
|
||||
" page_view " +
|
||||
"where " +
|
||||
" page['page_id'] = 'good_list' and page['item'] is not null ";
|
||||
// System.out.println(filterSQL);
|
||||
Table fullwordTable = tableEnv.sqlQuery(filterSQL);
|
||||
|
||||
//TODO 5.使用自定义的UDTF函数 对关键词进行拆分
|
||||
//相当于将fullwordTable表进行了注册
|
||||
Table keywordTable = tableEnv.sqlQuery("select rowtime ,keyword from " + fullwordTable + ", LATERAL TABLE(ik_analyze(fullword)) AS T(keyword)");
|
||||
|
||||
//TODO 6.分组开窗 聚合计算
|
||||
String selectSQL ="select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt," +
|
||||
" DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt," +
|
||||
" keyword," +
|
||||
" count(*) ct," +
|
||||
" '" + GmallConstant.KEYWORD_SEARCH + "' source," +
|
||||
" UNIX_TIMESTAMP() * 1000 ts " +
|
||||
"from " + keywordTable + " " +
|
||||
"group by " +
|
||||
" TUMBLE(rowtime, INTERVAL '10' SECOND)," +
|
||||
" keyword";
|
||||
System.out.println(selectSQL);
|
||||
Table resTable = tableEnv.sqlQuery(selectSQL);
|
||||
|
||||
//TODO 7.将表转换为流
|
||||
|
||||
DataStream<KeywordStats> keywordStatsDS = tableEnv.toAppendStream(resTable, KeywordStats.class);
|
||||
|
||||
keywordStatsDS.print(">>>>");
|
||||
//TODO 8.将流中数据写入CK
|
||||
keywordStatsDS.addSink(
|
||||
ClickhouseUtils.getJdbcSink("insert into keyword_stats(keyword,ct,source,stt,edt,ts) values(?,?,?,?,?,?)")
|
||||
);
|
||||
|
||||
|
||||
env.execute();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
-- 省份订单表
|
||||
create table order_wide
|
||||
(
|
||||
province_id BIGINT,
|
||||
|
|
@ -19,7 +20,7 @@ create table order_wide
|
|||
'scan.startup.mode' = 'latest-offset',
|
||||
'format' = 'json')
|
||||
|
||||
|
||||
-- 开窗聚合计算
|
||||
select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt,
|
||||
DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt,
|
||||
province_id,
|
||||
|
|
@ -29,11 +30,84 @@ select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:m
|
|||
province_3166_2_code iso_3166_2,
|
||||
count(distinct order_id) order_count,
|
||||
sum(split_total_amount) order_amount,
|
||||
UNIX_TIMESTAMP() * 1000 ts
|
||||
UNIX_TIMESTAMP() * 1000 ts
|
||||
from order_wide
|
||||
group by TUMBLE(rowtime, INTERVAL '10' SECOND),
|
||||
province_id,
|
||||
province_name,
|
||||
province_area_code,
|
||||
province_iso_code,
|
||||
province_3166_2_code
|
||||
province_3166_2_code
|
||||
|
||||
|
||||
-- 关键词主题统计动态表
|
||||
create table page_view
|
||||
(
|
||||
common MAP<STRING,
|
||||
STRING>,
|
||||
page MAP<STRING,
|
||||
STRING>,
|
||||
ts BIGINT,
|
||||
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
|
||||
WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND
|
||||
)
|
||||
WITH (
|
||||
'connector' = 'kafka',
|
||||
'topic' = 'dwm_page_log',
|
||||
'properties.bootstrap.servers' = 'Ding202:9092,Ding203:9092,Ding204:9092',
|
||||
'properties.group.id' = 'keyword_stats_app_group',
|
||||
'scan.startup.mode' = 'latest-offset',
|
||||
'format' = 'json')
|
||||
|
||||
|
||||
-- 将动态表中表示搜索行为的记录过滤出来
|
||||
select page['item'] fullword,
|
||||
rowtime
|
||||
from page_view
|
||||
where page['page_id'] = 'good_list'
|
||||
and page['item'] is not null
|
||||
|
||||
--使用自定义UDTF函数 对搜索关键词进行拆分
|
||||
/*
|
||||
搜索的内容: 小米插线板自营包邮
|
||||
|
||||
fullword rowtime
|
||||
小米插线板自营包邮 20210814
|
||||
|
||||
T表示一个临时表的名字
|
||||
select rowtime ,keyword from fullwordTable,LATERAL TABLE(函数名(fullword)) AS T(keyword)
|
||||
|
||||
拆分后的效果: 小米 插线板 自营 包邮
|
||||
keyword rowtime
|
||||
小米 20210814
|
||||
插线板 20210814
|
||||
自营 20210814
|
||||
包邮 20210814
|
||||
|
||||
*
|
||||
*/
|
||||
select rowtime, keyword
|
||||
from fullwordTable,
|
||||
LATERAL TABLE(函数名(fullword)) AS T(keyword)
|
||||
|
||||
|
||||
-- 分组开窗聚合计算
|
||||
select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt,
|
||||
DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt,
|
||||
keyword,
|
||||
count(*) ct,
|
||||
'KEYWORD_SEARCH' source,
|
||||
UNIX_TIMESTAMP() * 1000 ts
|
||||
from order_wide
|
||||
group by TUMBLE(rowtime, INTERVAL '10' SECOND),
|
||||
keyword
|
||||
|
||||
|
||||
|
||||
select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt,
|
||||
DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt,
|
||||
keyword,
|
||||
count(*) ct,
|
||||
'SEARCH' source,
|
||||
UNIX_TIMESTAMP() * 1000 ts
|
||||
from UnnamedTable$1group by TUMBLE(rowtime, INTERVAL '10' SECOND), keyword
|
||||
|
|
@ -1,11 +1,38 @@
|
|||
package com.atguigu.gmall.realtime.app.func;
|
||||
|
||||
import com.atguigu.gmall.realtime.utils.KeywordUtils;
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.annotation.FunctionHint;
|
||||
import org.apache.flink.table.functions.TableFunction;
|
||||
import org.apache.flink.types.Row;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.gmall.realtime.app.func
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 14:19
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
public class KeywordUDTF {
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.gmall.realtime.app.func
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 14:19
|
||||
*@Description: TODO 自定义flinkUDTF函数 把keyword拆分成多个冠检测
|
||||
*@Version: 1.0
|
||||
*/
|
||||
@FunctionHint(output = @DataTypeHint("ROW<word STRING>")) //返回值有几列,列的类型是什么;Row表示返回值的结果
|
||||
public class KeywordUDTF extends TableFunction<Row> {
|
||||
|
||||
//从参数的个数和类型,判断调用哪一个eval函数
|
||||
public void eval(String text) {
|
||||
List<String> keywordList = KeywordUtils.analyze(text);
|
||||
for (String keyword : keywordList) {
|
||||
collect(Row.of(keyword));
|
||||
}
|
||||
}
|
||||
|
||||
// public void eval(String str,int a) {
|
||||
// for (String s : str.split(" ")) {
|
||||
// // use collect(...) to emit a row
|
||||
// collect(Row.of(s, s.length()));
|
||||
// }
|
||||
// }
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,25 @@
|
|||
package com.atguigu.gmall.realtime.beans;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.gmall.realtime.beans
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 15:24
|
||||
*@Description: TODO
|
||||
*@Version: 1.0
|
||||
*/
|
||||
*@BelongsProject: rt-gmall-parent
|
||||
*@BelongsPackage: com.atguigu.gmall.realtime.beans
|
||||
*@Author: markilue
|
||||
*@CreateTime: 2023-05-12 15:24
|
||||
*@Description: TODO 关键词实体类
|
||||
*@Version: 1.0
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class KeywordStats {
|
||||
private String keyword;
|
||||
private Long ct;
|
||||
private String source;
|
||||
private String stt;
|
||||
private String edt;
|
||||
private Long ts;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,5 @@ public class KeywordUtils {
|
|||
public static void main(String[] args) {
|
||||
String text = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待";
|
||||
System.out.println(KeywordUtils.analyze(text));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue