diff --git a/Big_data_example/rt-gmall-parent/gmall-logger/src/main/resources/logback.xml b/Big_data_example/rt-gmall-parent/gmall-logger/src/main/resources/logback.xml index e997464..395fd4d 100644 --- a/Big_data_example/rt-gmall-parent/gmall-logger/src/main/resources/logback.xml +++ b/Big_data_example/rt-gmall-parent/gmall-logger/src/main/resources/logback.xml @@ -26,7 +26,7 @@ - + diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/beans/ProvinceStats.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/beans/ProvinceStats.java index 306d4ea..d4b3276 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/beans/ProvinceStats.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/beans/ProvinceStats.java @@ -1,11 +1,29 @@ package com.atguigu.rtgmall.beans; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.math.BigDecimal; + /** -*@BelongsProject: rt-gmall-parent -*@BelongsPackage: com.atguigu.rtgmall.beans -*@Author: markilue -*@CreateTime: 2023-05-12 21:29 -*@Description: TODO -*@Version: 1.0 -*/ + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.rtgmall.beans + *@Author: markilue + *@CreateTime: 2023-05-12 21:29 + *@Description: TODO 地区交易额统计实体类 + *@Version: 1.0 + */ +@AllArgsConstructor +@Data +@NoArgsConstructor public class ProvinceStats { + + private String stt; + private String edt; + private String province_id; + private String province_name; + private BigDecimal order_amount; + private String ts; + } diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/SugarController.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/SugarController.java index d2a53a2..bf7afd4 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/SugarController.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/SugarController.java @@ -1,7 +1,10 @@ package com.atguigu.rtgmall.controller; +import com.atguigu.rtgmall.beans.ProductStats; +import com.atguigu.rtgmall.beans.ProvinceStats; import com.atguigu.rtgmall.service.ProductStatsService; +import com.atguigu.rtgmall.service.ProvinceStatsService; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -24,6 +27,9 @@ public class SugarController { @Autowired private ProductStatsService productStatsService; + @Autowired + private ProvinceStatsService provinceStatsService; + @RequestMapping("/gmv") public String getGMV(@RequestParam(value = "date", defaultValue = "0") Integer date) { @@ -39,6 +45,168 @@ public class SugarController { return json; } + @RequestMapping("/tm") + public String getProductStatsByTm(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) { + if (date == 0) { + date = now(); + } + /* + { + "status": 0, + "data": { + "categories": ["苹果","三星", "华为","oppo","vivo","小米98"], + "series": [{ + "name": "商品品牌", + "data": [5101,6100,6371,5682,5430,9533] + }] + } + } + */ + //调用service获取品牌交易额 + List productStatsList = productStatsService.getProductStatsByTm(date, limit); + + //对查询结果进行遍历,获取品牌以及品牌交易额,将品牌以及品牌交易额分别封装到独立的list集合中 + List tmList = new ArrayList<>(); + List amountList = new ArrayList<>(); + for (ProductStats productStats : productStatsList) { + tmList.add(productStats.getTm_name()); + amountList.add(productStats.getOrder_amount()); + } + String json = "{" + + " \"status\": 0," + + " \"data\": {" + + " \"categories\": [\"" + StringUtils.join(tmList, "\",\"") + "\"]," + + " \"series\": [{" + + " \"name\": \"商品品牌\"," + + " \"data\": [" + StringUtils.join(amountList, ",") + "]" + + " }]" + + " }" + + " }"; + return json; + } + +// @RequestMapping("/category3") +// public String getProductStatsByCategory3(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) { +// if (date == 0) { +// date = now(); +// } +// /* +// {"status": 0,"data": [{"name": "PC","value": 97},{"name": "iOS","value": 50,}]} +// */ +// //调用service获取品牌交易额 +// List productStatsList = productStatsService.getProductStatsByCategory3(date, limit); +// +// //对查询结果进行遍历,获取品牌以及品牌交易额,将品牌以及品牌交易额分别封装到独立的list集合中 +// StringBuilder json = new StringBuilder("{\"status\": 0,\"data\": ["); +// +// for (int i = 0; i < productStatsList.size(); i++) { +// ProductStats productStats = productStatsList.get(i); +// json.append("{\"name\": \"" + productStats.getCategory3_name() + "\",\"value\": " + productStats.getOrder_amount() + "}"); +// if (i < productStatsList.size() - 1) { +// json.append(","); +// } +// } +// +// json.append("]}"); +// +// return json.toString(); +// } + + + @RequestMapping("/category3") + public Object getProductStatsByCategory3(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) { + if (date == 0) { + date = now(); + } + /* + {"status": 0,"data": [{"name": "PC","value": 97},{"name": "iOS","value": 50,}]} + */ + //调用service获取品牌交易额 + List productStatsList = productStatsService.getProductStatsByCategory3(date, limit); + HashMap resMap = new HashMap(); + List dateList = new ArrayList(); + for (ProductStats productStats : productStatsList) { + HashMap dateMap = new HashMap(); + dateMap.put("name", productStats.getCategory3_name()); + dateMap.put("value", productStats.getOrder_amount()); + dateList.add(dateMap); + } + resMap.put("status", 0); + resMap.put("data", dateList); + + + return resMap; + } + + + @RequestMapping("/spu") + public String getProductStatsBySPU(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) { + if (date == 0) { + date = now(); + } + + //调用service获取品牌交易额 + List productStatsList = productStatsService.getProductStatsBySPU(date, limit); + + + //对查询结果进行遍历,获取品牌以及品牌交易额,将品牌以及品牌交易额分别封装到独立的list集合中 + StringBuilder json = new StringBuilder("{ " + + " \"status\": 0, " + + " \"data\": { " + + " \"columns\": [ " + + " { " + + " \"name\": \"商品名称\", " + + " \"id\": \"name\" " + + " }, " + + " { " + + " \"name\": \"交易额\", " + + " \"id\": \"amount\" " + + " }, " + + " { " + + " \"name\": \"订单数\", " + + " \"id\": \"ct\" " + + " } " + + " ], " + + " \"rows\": ["); + + for (int i = 0; i < productStatsList.size(); i++) { + ProductStats productStats = productStatsList.get(i); +// System.out.println(productStats.getSpu_name()+":"+productStats.getOrder_amount()+":"+productStats.getOrder_ct()); + json.append("{\"name\": \"" + productStats.getSpu_name() + "\",\"amount\": " + productStats.getOrder_amount() + ",\"ct\": " + productStats.getOrder_ct() + "}"); + if (i < productStatsList.size() - 1) { + json.append(","); + } + } + + json.append("]}}"); + + return json.toString(); + } + + + @RequestMapping("province") + public String getProvinceStats(@RequestParam(value = "date", defaultValue = "0") Integer date) { + if (date == 0) { + date = now(); + } + + List provinceStatsList = provinceStatsService.getProvinceStats(date); + + StringBuilder json = new StringBuilder("{\"status\": 0,\"data\": {\"mapData\": ["); + for (int i = 0; i < provinceStatsList.size(); i++) { + ProvinceStats provinceStats = provinceStatsList.get(i); + json.append("{\"name\": \"" + provinceStats.getProvince_name() + "\",\"value\": " + provinceStats.getOrder_amount() + "}"); + if (i < provinceStatsList.size() - 1) { + json.append(","); + } + } + + json.append("],\"valueName\": \"省份交易额\"}}"); + return json.toString(); + + } + + //获取当前日期 private Integer now() { String yyyyMMdd = DateFormatUtils.format(new Date(), "yyyyMMdd"); diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/format.json b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/format.json index e69de29..5d28907 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/format.json +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/controller/format.json @@ -0,0 +1,83 @@ +//横向柱状图 +{ + "status": 0, + "data": { + "categories": [ + "苹果", + "小米", + "华为", + "Redmi", + "索芙特" + ], + "series": [ + { + "name": "商品品牌", + "data": [ + 258910.00, + 153974.00, + 127240.00, + 24681.00, + 3439.00 + ] + } + ] + } +} + +//饼状图 +{ + "status": 0, + "msg": "", + "data": [ + { + "name": "PC", + "value": 97 + }, + { + "name": "iOS", + "value": 50 + } + ] +} + +//轮播表格 +{ + "status": 0, + "data": { + "columns": [ + { + "name": "商品名称", + "id": "name" + }, + { + "name": "交易额", + "id": "amount" + }, + { + "name": "订单数", + "id": "ct" + } + ], + "rows": [ + { + "name": "北京总部", + "amount": 1, + "cr": 1 + }, + { + "name": "北京总部", + "amount": 1, + "cr": 1 + } + ] + } +} + +//色彩地图 +{"status": 0,"data": {"mapData": [ + {"name": "北京","value": 8985,}, + { + "name": "天津", + "value": 8616, + }, + ],"valueName": "省份交易额",}} \ No newline at end of file diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProductStatsMapper.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProductStatsMapper.java index 0c47cea..96edfa0 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProductStatsMapper.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProductStatsMapper.java @@ -26,4 +26,18 @@ public interface ProductStatsMapper { List selectProductStatsByTm(@Param("date") Integer date, @Param("limit") Integer limit); + @Select("select category3_id,category3_name,sum(order_amount) order_amount from product_stats " + + "where toYYYYMMDD(stt)=#{date} " + + "group by category3_id,category3_name " + + "having order_amount>0 " + + "order by order_amount desc limit #{limit} ") + List selectProductStatsByCategory3(@Param("date") Integer date, @Param("limit") Integer limit); + + @Select("select spu_id, spu_name, sum(order_amount) order_amount, sum(order_ct) order_ct " + + "from product_stats " + + "where toYYYYMMDD(stt) = #{date} " + + "group by spu_id, spu_name " + + "having order_amount > 0 " + + "order by order_amount desc, order_ct desc limit #{limit} ") + List selectProductStatsBySPU(@Param("date") Integer date, @Param("limit") Integer limit); } \ No newline at end of file diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProvinceStatsMapper.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProvinceStatsMapper.java index 7550dc3..89016f0 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProvinceStatsMapper.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/ProvinceStatsMapper.java @@ -1,11 +1,26 @@ package com.atguigu.rtgmall.mapper; + +import com.atguigu.rtgmall.beans.ProvinceStats; +import org.apache.ibatis.annotations.Select; + +import java.util.List; + /** -*@BelongsProject: rt-gmall-parent -*@BelongsPackage: com.atguigu.rtgmall.mapper -*@Author: markilue -*@CreateTime: 2023-05-12 21:29 -*@Description: TODO -*@Version: 1.0 -*/ -public class ProvinceStatsMapper { + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.rtgmall.mapper + *@Author: markilue + *@CreateTime: 2023-05-12 21:29 + *@Description: TODO 地区统计Mapper + *@Version: 1.0 + */ +public interface ProvinceStatsMapper { + + + @Select("select province_id,province_name,sum(order_amount) order_amount " + + "from province_stats " + + "where toYYYYMMDD(stt) = #{date} " + + "group by province_id, province_name") + List selectProvinceStats(Integer date); + + } diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/selectSQL.sql b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/selectSQL.sql index 38e24b0..200c364 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/selectSQL.sql +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/mapper/selectSQL.sql @@ -1,9 +1,37 @@ - - -- 品牌销售类 -select tm_id,tm_name,sum(order_amount) order_amount from product_stats -where toYYYYMMDD(stt)=20230511 -group by tm_id,tm_name -having order_amount>0 -order by order_amount desc limit 5; \ No newline at end of file +select tm_id, tm_name, sum(order_amount) order_amount +from product_stats +where toYYYYMMDD(stt) = 20230511 +group by tm_id, tm_name +having order_amount > 0 +order by order_amount desc limit 5; + +-- 品类销售额占比 +select category3_id, category3_name, sum(order_amount) order_amount +from product_stats +where toYYYYMMDD(stt) = 20230511 +group by category3_id, category3_name +having order_amount > 0 +order by order_amount desc limit 5; + +-- spu销售额 +select spu_id, spu_name, sum(order_amount) order_amount, sum(order_ct) order_ct +from product_stats +where toYYYYMMDD(stt) = 20230511 +group by spu_id, spu_name +having order_amount > 0 +order by order_amount desc, order_ct desc limit 5; + +select spu_id, spu_name, sum(order_amount) order_amount, sum(order_ct) order_ct +from product_stats +where toYYYYMMDD(stt) = 20230511 +group by spu_id, spu_name +having order_amount > 0 +order by order_amount desc, order_ct desc limit 5 + +-- 省份交易额 +select province_id,province_name,sum(order_amount) order_amount +from province_stats +where toYYYYMMDD(stt) = 20230511 +group by province_id, province_name \ No newline at end of file diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProductStatsService.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProductStatsService.java index d4cf97f..c2deb16 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProductStatsService.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProductStatsService.java @@ -18,5 +18,13 @@ public interface ProductStatsService { //获取品牌成交额 List getProductStatsByTm(Integer date, Integer limit); + List getProductStatsByCategory3(Integer date, Integer limit); + + //获取spu成交额 + List getProductStatsBySPU(Integer date, Integer limit); + + + + } diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProvinceStatsService.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProvinceStatsService.java index e27f9bb..ef8d377 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProvinceStatsService.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/ProvinceStatsService.java @@ -1,5 +1,9 @@ package com.atguigu.rtgmall.service; +import com.atguigu.rtgmall.beans.ProvinceStats; + +import java.util.List; + /** *@BelongsProject: rt-gmall-parent *@BelongsPackage: com.atguigu.rtgmall.service @@ -9,4 +13,7 @@ package com.atguigu.rtgmall.service; *@Version: 1.0 */ public interface ProvinceStatsService { + + //地区交易额 + List getProvinceStats(Integer date); } diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProductStatsServiceImpl.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProductStatsServiceImpl.java index c47a379..3d68cf1 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProductStatsServiceImpl.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProductStatsServiceImpl.java @@ -30,5 +30,15 @@ public class ProductStatsServiceImpl implements ProductStatsService { return productStatsMapper.selectProductStatsByTm(date, limit); } + @Override + public List getProductStatsByCategory3(Integer date, Integer limit) { + return productStatsMapper.selectProductStatsByCategory3(date,limit); + } + + @Override + public List getProductStatsBySPU(Integer date, Integer limit) { + return productStatsMapper.selectProductStatsBySPU(date, limit); + } + } diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProvinceStatsServiceImpl.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProvinceStatsServiceImpl.java index 52e896c..385f8eb 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProvinceStatsServiceImpl.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/main/java/com/atguigu/rtgmall/service/impl/ProvinceStatsServiceImpl.java @@ -1,11 +1,31 @@ package com.atguigu.rtgmall.service.impl; + +import com.atguigu.rtgmall.beans.ProvinceStats; +import com.atguigu.rtgmall.mapper.ProvinceStatsMapper; +import com.atguigu.rtgmall.service.ProvinceStatsService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + /** -*@BelongsProject: rt-gmall-parent -*@BelongsPackage: com.atguigu.rtgmall.service.impl -*@Author: markilue -*@CreateTime: 2023-05-12 21:32 -*@Description: TODO -*@Version: 1.0 -*/ -public class ProvinceStatsServiceImpl { + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.rtgmall.service.impl + *@Author: markilue + *@CreateTime: 2023-05-12 21:32 + *@Description: TODO 地区统计接口实现类 + *@Version: 1.0 + */ +@Service +public class ProvinceStatsServiceImpl implements ProvinceStatsService { + + + @Autowired + private ProvinceStatsMapper provinceStatsMapper; + + + @Override + public List getProvinceStats(Integer date) { + return provinceStatsMapper.selectProvinceStats(date); + } } diff --git a/Big_data_example/rt-gmall-parent/gmall-publisher/src/test/java/com/atguigu/rtgmall/test1.java b/Big_data_example/rt-gmall-parent/gmall-publisher/src/test/java/com/atguigu/rtgmall/test1.java index 1a633f6..409948d 100644 --- a/Big_data_example/rt-gmall-parent/gmall-publisher/src/test/java/com/atguigu/rtgmall/test1.java +++ b/Big_data_example/rt-gmall-parent/gmall-publisher/src/test/java/com/atguigu/rtgmall/test1.java @@ -1,11 +1,24 @@ package com.atguigu.rtgmall; + /** -*@BelongsProject: rt-gmall-parent -*@BelongsPackage: com.atguigu.rtgmall -*@Author: markilue -*@CreateTime: 2023-05-12 21:07 -*@Description: TODO -*@Version: 1.0 -*/ + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.rtgmall + *@Author: markilue + *@CreateTime: 2023-05-12 21:07 + *@Description: TODO + *@Version: 1.0 + */ public class test1 { + + public static void main(String[] args) { + String date ="20230511"; + String limit ="5"; + + String sql = "select spu_id,spu_name,sum(order_amount) order_amount,sum(order_ct) order_ct from product_stats " + + "where toYYYYMMDD(stt)="+date+" " + + "group by spu_id,spu_name " + + "having order_amount > 0 " + + "order by order_amount desc,order_ct desc limit "+limit+" "; + System.out.println(sql); + } } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java index f2ff01d..ee68325 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dwm/OrderWideApp.java @@ -211,9 +211,9 @@ public class OrderWideApp { new DimAsyncFunction("DIM_BASE_PROVINCE") { @Override public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception { - orderWide.setProvince_3166_2_code(dimJsonObj.getString("NAME")); - orderWide.setProvince_area_code(dimJsonObj.getString("ISO_3166_2")); - orderWide.setProvince_name(dimJsonObj.getString("AREA_CODE")); + orderWide.setProvince_3166_2_code(dimJsonObj.getString("ISO_3166_2")); + orderWide.setProvince_area_code(dimJsonObj.getString("AREA_CODE")); + orderWide.setProvince_name(dimJsonObj.getString("NAME")); orderWide.setProvince_iso_code(dimJsonObj.getString("ISO_CODE")); } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/KeywordStatsApp.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/KeywordStatsApp.java index 093bf9f..2312bed 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/KeywordStatsApp.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/KeywordStatsApp.java @@ -1,11 +1,103 @@ package com.atguigu.gmall.realtime.app.dws; + +import com.atguigu.gmall.realtime.app.func.KeywordUDTF; +import com.atguigu.gmall.realtime.beans.GmallConstant; +import com.atguigu.gmall.realtime.beans.KeywordStats; +import com.atguigu.gmall.realtime.utils.ClickhouseUtils; +import com.atguigu.gmall.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + /** -*@BelongsProject: rt-gmall-parent -*@BelongsPackage: com.atguigu.gmall.realtime.app.dws -*@Author: markilue -*@CreateTime: 2023-05-12 14:27 -*@Description: TODO -*@Version: 1.0 -*/ + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.dws + *@Author: markilue + *@CreateTime: 2023-05-12 14:27 + *@Description: TODO 关键词统计DWS + *@Version: 1.0 + */ public class KeywordStatsApp { + + public static void main(String[] args) throws Exception { + + //TODO 1.基本环境准备 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + env.setParallelism(4); + + //TODO 2.检查点设置 + env.enableCheckpointing(5000L); + env.getCheckpointConfig().setCheckpointTimeout(5000L); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/rt_gmall/gmall")); + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + + //TODO 2.k注册自定义的UDTF函数 + tableEnv.createTemporarySystemFunction("ik_analyze", KeywordUDTF.class); + + + //TODO 3.从kafka中读取数据 + String topic = "dwd_page_log"; + String groupId = "keyword_stats_app_group"; + String createSQl = "create table page_view" + + "(" + + " common MAP," + + " page MAP," + + " ts BIGINT," + + " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss'))," + + " WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND" + + ")" + + " WITH (" + MyKafkaUtils.getKafkaDDL(topic, groupId) + ")"; + tableEnv.executeSql(createSQl); + + //TODO 4.将动态表中表示搜索行为的记录过滤出来 + String filterSQL = "select " + + " page['item'] fullword,rowtime " + + "from" + + " page_view " + + "where " + + " page['page_id'] = 'good_list' and page['item'] is not null "; +// System.out.println(filterSQL); + Table fullwordTable = tableEnv.sqlQuery(filterSQL); + + //TODO 5.使用自定义的UDTF函数 对关键词进行拆分 + //相当于将fullwordTable表进行了注册 + Table keywordTable = tableEnv.sqlQuery("select rowtime ,keyword from " + fullwordTable + ", LATERAL TABLE(ik_analyze(fullword)) AS T(keyword)"); + + //TODO 6.分组开窗 聚合计算 + String selectSQL ="select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt," + + " DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt," + + " keyword," + + " count(*) ct," + + " '" + GmallConstant.KEYWORD_SEARCH + "' source," + + " UNIX_TIMESTAMP() * 1000 ts " + + "from " + keywordTable + " " + + "group by " + + " TUMBLE(rowtime, INTERVAL '10' SECOND)," + + " keyword"; + System.out.println(selectSQL); + Table resTable = tableEnv.sqlQuery(selectSQL); + + //TODO 7.将表转换为流 + + DataStream keywordStatsDS = tableEnv.toAppendStream(resTable, KeywordStats.class); + + keywordStatsDS.print(">>>>"); + //TODO 8.将流中数据写入CK + keywordStatsDS.addSink( + ClickhouseUtils.getJdbcSink("insert into keyword_stats(keyword,ct,source,stt,edt,ts) values(?,?,?,?,?,?)") + ); + + + env.execute(); + } } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql index b73ed11..853bdd8 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/dws/tablesql.sql @@ -1,3 +1,4 @@ +-- 省份订单表 create table order_wide ( province_id BIGINT, @@ -19,7 +20,7 @@ create table order_wide 'scan.startup.mode' = 'latest-offset', 'format' = 'json') - +-- 开窗聚合计算 select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt, DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt, province_id, @@ -29,11 +30,84 @@ select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:m province_3166_2_code iso_3166_2, count(distinct order_id) order_count, sum(split_total_amount) order_amount, - UNIX_TIMESTAMP() * 1000 ts + UNIX_TIMESTAMP() * 1000 ts from order_wide group by TUMBLE(rowtime, INTERVAL '10' SECOND), province_id, province_name, province_area_code, province_iso_code, - province_3166_2_code \ No newline at end of file + province_3166_2_code + + +-- 关键词主题统计动态表 +create table page_view +( + common MAP, + page MAP, + ts BIGINT, + rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')), + WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND +) + WITH ( + 'connector' = 'kafka', + 'topic' = 'dwm_page_log', + 'properties.bootstrap.servers' = 'Ding202:9092,Ding203:9092,Ding204:9092', + 'properties.group.id' = 'keyword_stats_app_group', + 'scan.startup.mode' = 'latest-offset', + 'format' = 'json') + + +-- 将动态表中表示搜索行为的记录过滤出来 +select page['item'] fullword, + rowtime +from page_view +where page['page_id'] = 'good_list' + and page['item'] is not null + +--使用自定义UDTF函数 对搜索关键词进行拆分 +/* + 搜索的内容: 小米插线板自营包邮 + + fullword rowtime + 小米插线板自营包邮 20210814 + + T表示一个临时表的名字 + select rowtime ,keyword from fullwordTable,LATERAL TABLE(函数名(fullword)) AS T(keyword) + + 拆分后的效果: 小米 插线板 自营 包邮 + keyword rowtime + 小米 20210814 + 插线板 20210814 + 自营 20210814 + 包邮 20210814 + +* + */ +select rowtime, keyword +from fullwordTable, + LATERAL TABLE(函数名(fullword)) AS T(keyword) + + +-- 分组开窗聚合计算 +select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt, + DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt, + keyword, + count(*) ct, + 'KEYWORD_SEARCH' source, + UNIX_TIMESTAMP() * 1000 ts +from order_wide +group by TUMBLE(rowtime, INTERVAL '10' SECOND), + keyword + + + +select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as stt, + DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') as edt, + keyword, + count(*) ct, + 'SEARCH' source, + UNIX_TIMESTAMP() * 1000 ts +from UnnamedTable$1group by TUMBLE(rowtime, INTERVAL '10' SECOND), keyword \ No newline at end of file diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/KeywordUDTF.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/KeywordUDTF.java index 7b91772..13e3a97 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/KeywordUDTF.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/app/func/KeywordUDTF.java @@ -1,11 +1,38 @@ package com.atguigu.gmall.realtime.app.func; + +import com.atguigu.gmall.realtime.utils.KeywordUtils; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.util.List; + /** -*@BelongsProject: rt-gmall-parent -*@BelongsPackage: com.atguigu.gmall.realtime.app.func -*@Author: markilue -*@CreateTime: 2023-05-12 14:19 -*@Description: TODO -*@Version: 1.0 -*/ -public class KeywordUDTF { + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.app.func + *@Author: markilue + *@CreateTime: 2023-05-12 14:19 + *@Description: TODO 自定义flinkUDTF函数 把keyword拆分成多个冠检测 + *@Version: 1.0 + */ +@FunctionHint(output = @DataTypeHint("ROW")) //返回值有几列,列的类型是什么;Row表示返回值的结果 +public class KeywordUDTF extends TableFunction { + + //从参数的个数和类型,判断调用哪一个eval函数 + public void eval(String text) { + List keywordList = KeywordUtils.analyze(text); + for (String keyword : keywordList) { + collect(Row.of(keyword)); + } + } + +// public void eval(String str,int a) { +// for (String s : str.split(" ")) { +// // use collect(...) to emit a row +// collect(Row.of(s, s.length())); +// } +// } + + } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/KeywordStats.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/KeywordStats.java index cbed627..bd529b2 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/KeywordStats.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/beans/KeywordStats.java @@ -1,11 +1,25 @@ package com.atguigu.gmall.realtime.beans; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + /** -*@BelongsProject: rt-gmall-parent -*@BelongsPackage: com.atguigu.gmall.realtime.beans -*@Author: markilue -*@CreateTime: 2023-05-12 15:24 -*@Description: TODO -*@Version: 1.0 -*/ + *@BelongsProject: rt-gmall-parent + *@BelongsPackage: com.atguigu.gmall.realtime.beans + *@Author: markilue + *@CreateTime: 2023-05-12 15:24 + *@Description: TODO 关键词实体类 + *@Version: 1.0 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor public class KeywordStats { + private String keyword; + private Long ct; + private String source; + private String stt; + private String edt; + private Long ts; } diff --git a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java index fa2521b..2c5c77c 100644 --- a/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java +++ b/Big_data_example/rt-gmall-parent/gmall-realtime/src/main/java/com/atguigu/gmall/realtime/utils/KeywordUtils.java @@ -44,6 +44,5 @@ public class KeywordUtils { public static void main(String[] args) { String text = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待"; System.out.println(KeywordUtils.analyze(text)); - } }