实时数仓任务ads更新

This commit is contained in:
markilue 2023-05-15 20:04:29 +08:00
parent 5619d60ed1
commit 29c7297452
16 changed files with 663 additions and 10 deletions

View File

@ -1,4 +1,4 @@
server.port=8081 server.port=8082
# kafka???? # kafka????
spring.kafka.bootstrap-servers = Ding202:9092,Ding203:9092,Ding204:9092 spring.kafka.bootstrap-servers = Ding202:9092,Ding203:9092,Ding204:9092
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer

View File

@ -0,0 +1,27 @@
package com.atguigu.rtgmall.beans;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.beans
*@Author: markilue
*@CreateTime: 2023-05-15 15:00
*@Description: TODO 关键词统计实体类
*@Version: 1.0
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeywordStats {
private String stt;
private String edt;
private String keyword;
private Long ct;
private String ts;
}

View File

@ -0,0 +1,72 @@
package com.atguigu.rtgmall.beans;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.math.RoundingMode;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.beans
*@Author: markilue
*@CreateTime: 2023-05-15 13:41
*@Description: TODO 访客统计实体类
*
*@Version: 1.0
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VisitorStats {
private String stt;
private String edt;
private String vc;
private String ch;
private String ar;
private String is_new;
private Long uv_ct = 0L;
private Long pv_ct = 0L;
private Long sv_ct = 0L;
private Long uj_ct = 0L;
private Long dur_sum = 0L;
private Long new_uv = 0L;
private Long ts;
private int hr;
//计算跳出率 = 跳出次数*100/访问次数
public BigDecimal getUjRate() {
if (sv_ct != 0L) {
return BigDecimal.valueOf(uj_ct)
.multiply(BigDecimal.valueOf(100))
.divide(BigDecimal.valueOf(sv_ct), 2, RoundingMode.HALF_UP);
} else {
return BigDecimal.ZERO;
}
}
//计算每次访问停留时间() = 当日总停留时间毫秒)/当日访问次数/1000
public BigDecimal getDurPerSv() {
if (sv_ct != 0L) {
return BigDecimal.valueOf(dur_sum)
.divide(BigDecimal.valueOf(sv_ct), 0, RoundingMode.HALF_UP)
.divide(BigDecimal.valueOf(1000), 1, RoundingMode.HALF_UP);
} else {
return BigDecimal.ZERO;
}
}
//计算每次访问停留页面数 = 当日总访问页面数/当日访问次数
public BigDecimal getPvPerSv() {
if (sv_ct != 0L) {
return BigDecimal.valueOf(pv_ct)
.divide(BigDecimal.valueOf(sv_ct), 2, RoundingMode.HALF_UP);
} else {
return BigDecimal.ZERO;
}
}
}

View File

@ -1,10 +1,14 @@
package com.atguigu.rtgmall.controller; package com.atguigu.rtgmall.controller;
import com.atguigu.rtgmall.beans.KeywordStats;
import com.atguigu.rtgmall.beans.ProductStats; import com.atguigu.rtgmall.beans.ProductStats;
import com.atguigu.rtgmall.beans.ProvinceStats; import com.atguigu.rtgmall.beans.ProvinceStats;
import com.atguigu.rtgmall.beans.VisitorStats;
import com.atguigu.rtgmall.service.KeywordStatsService;
import com.atguigu.rtgmall.service.ProductStatsService; import com.atguigu.rtgmall.service.ProductStatsService;
import com.atguigu.rtgmall.service.ProvinceStatsService; import com.atguigu.rtgmall.service.ProvinceStatsService;
import com.atguigu.rtgmall.service.VisitorStatsService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -30,6 +34,12 @@ public class SugarController {
@Autowired @Autowired
private ProvinceStatsService provinceStatsService; private ProvinceStatsService provinceStatsService;
@Autowired
private VisitorStatsService visitorStatsService;
@Autowired
private KeywordStatsService keywordStatsService;
@RequestMapping("/gmv") @RequestMapping("/gmv")
public String getGMV(@RequestParam(value = "date", defaultValue = "0") Integer date) { public String getGMV(@RequestParam(value = "date", defaultValue = "0") Integer date) {
@ -206,6 +216,159 @@ public class SugarController {
} }
@RequestMapping("visitor")
public String getVisitorStats(@RequestParam(value = "date", defaultValue = "0") Integer date) {
if (date == 0) {
date = now();
}
List<VisitorStats> visitorStatsList = visitorStatsService.getVisitorStats(date);
//构建新老用户
VisitorStats newVisitor = new VisitorStats();
VisitorStats oldVisitor = new VisitorStats();
for (VisitorStats visitorStats : visitorStatsList) {
if ("1".equals(visitorStats.getIs_new())) {
//新访客
newVisitor = visitorStats;
} else {
//老访客
oldVisitor = visitorStats;
}
}
String json = "{ " +
" \"status\": 0, " +
" \"data\": { " +
" \"columns\": [" +
" { " +
" \"name\": \"类别\", " +
" \"id\": \"type\" " +
" }, " +
" { " +
" \"name\": \"新用户\", " +
" \"id\": \"new\" " +
" }, " +
" { " +
" \"name\": \"老用户\", " +
" \"id\": \"old\" " +
" }" +
" ], " +
" \"rows\": [ " +
" { " +
" \"type\": \"用户数(人)\", " +
" \"new\": " + newVisitor.getUv_ct() + ", " +
" \"old\": " + oldVisitor.getUv_ct() +
" }, " +
" { " +
" \"type\": \"总访问页面(次)\", " +
" \"new\": " + newVisitor.getPv_ct() + ", " +
" \"old\": " + oldVisitor.getPv_ct() +
" }, " +
" { " +
" \"type\": \"跳出率(%)\", " +
" \"new\": " + newVisitor.getUjRate() + ", " +
" \"old\": " + oldVisitor.getUjRate() +
" }, " +
" { " +
" \"type\": \"平均在线时长(秒)\", " +
" \"new\": " + newVisitor.getDurPerSv() + ", " +
" \"old\": " + oldVisitor.getDurPerSv() +
" }, " +
" { " +
" \"type\": \"平均访问页面数(次)\", " +
" \"new\": " + newVisitor.getPvPerSv() + ", " +
" \"old\": " + oldVisitor.getPvPerSv() +
" }" +
" ]}}";
return json;
}
@RequestMapping("hr")
public String getVisitorStatsByHr(@RequestParam(value = "date", defaultValue = "0") Integer date) {
if (date == 0) {
date = now();
}
//从Service中获取分时统计的访问情况
List<VisitorStats> visitorStatsList = visitorStatsService.getVisitorStatsByHr(date);
//定义一个数组,用于存放24小时分时统计情况
Long[] PVArr = new Long[24];
Long[] UVArr = new Long[24];
Long[] newUVArr = new Long[24];
Arrays.fill(PVArr, 0L);
Arrays.fill(UVArr, 0L);
Arrays.fill(newUVArr, 0L);
String[] category = {"00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23"};
//从集合中获取结果赋值给数组中对应的小时
for (VisitorStats visitorStats : visitorStatsList) {
//从数组中获取一天24的数据
PVArr[visitorStats.getHr()] = visitorStats.getPv_ct();
UVArr[visitorStats.getHr()] = visitorStats.getUv_ct();
newUVArr[visitorStats.getHr()] = visitorStats.getNew_uv();
}
String pvString = StringUtils.join(PVArr, ",");
String uvString = StringUtils.join(UVArr, ",");
String newUvString = StringUtils.join(newUVArr, ",");
String categoryString = "\"" + StringUtils.join(category, "\",\"") + "\"";
String json = "{ " +
" \"status\": 0, " +
" \"data\": { " +
" \"categories\": [" + categoryString + "], " +
" \"series\": [ " +
" { " +
" \"name\": \"UV\", " +
" \"data\": [" + uvString + "] " +
" }, " +
" { " +
" \"name\": \"PV\", " +
" \"data\": [" + pvString + "] " +
" }, " +
" { " +
" \"name\": \"newUV\", " +
" \"data\": [" + newUvString + "] " +
" } " +
" ] " +
" } " +
"}";
return json;
}
@RequestMapping("keyword")
public String getKeywordStats(@RequestParam(value = "date", defaultValue = "0") Integer date, @RequestParam(value = "limit", defaultValue = "20") Integer limit) {
if (date == 0) {
date = now();
}
List<KeywordStats> keywordStatsList = keywordStatsService.getKeywordStats(date, limit);
StringBuilder json = new StringBuilder("{ " +
" \"status\": 0, " +
" \"data\": [");
for (int i = 0; i < keywordStatsList.size(); i++) {
KeywordStats keywordStats = keywordStatsList.get(i);
json.append("{ " +
" \"name\": \"" + keywordStats.getKeyword() + "\", " +
" \"value\": " + keywordStats.getCt() + " " +
" }");
if (i < keywordStatsList.size() - 1) {
json.append(",");
}
}
json.append("]}");
return json.toString();
}
//获取当前日期 //获取当前日期
private Integer now() { private Integer now() {

View File

@ -74,10 +74,173 @@
} }
// //
{"status": 0,"data": {"mapData": [ {
{"name": "北京","value": 8985,}, "status": 0,
"data": {
"mapData": [
{
"name": "北京",
"value": 8985
},
{ {
"name": "天津", "name": "天津",
"value": 8616, "value": 8616
}
],
"valueName": "省份交易额"
}
}
//
{
"status": 0,
"data": {
"total": 13,
"columns": [
{
"name": "类别",
"id": "type"
}, },
],"valueName": "省份交易额",}} {
"name": "新用户",
"id": "new"
},
{
"name": "老用户",
"id": "old"
}
],
"rows": [
{
"type": "用户数(人)",
"new": 200,
"old": 500
},
{
"type": "总访问页面(次)",
"new": 200,
"old": 500
},
{
"type": "跳出率(%)",
"new": 200,
"old": 500
},
{
"type": "平均在线时长(秒)",
"new": 200,
"old": 500
},
{
"type": "平均访问页面数(次)",
"new": 200,
"old": 500
}
]
}
}
{
"status": 0,
"data": {
"columns": [
{
"name": "类别",
"id": "type"
},
{
"name": "新用户",
"id": "new"
},
{
"name": "老用户",
"id": "old"
}
],
"rows": [
{
"type": "用户数(人)",
"new": 8,
"old": 6
},
{
"type": "总访问页面(次)",
"new": 80,
"old": 69
},
{
"type": "跳出率(%)",
"new": 0.00,
"old": 0.00
},
{
"type": "平均在线时长(秒)",
"new": 52.2,
"old": 55.2
},
{
"type": "平均访问页面数(次)",
"new": 5.33,
"old": 4.93
}
]
}
}
//线
{
"status": 0,
"data": {
"categories": [
"00",
"01",
"02",
...,
"23"
],
"series": [
{
"name": "UV",
"data": [
2891,
4166,
...,
2625
]
},
{
"name": "PV",
"data": [
6289,
8964,
...,
5030
]
},
{
"name": "newUV",
"data": [
6289,
8964,
...,
5030
]
}
]
}
}
//3d
{
"status": 0,
"msg": "",
"data": [
{
"name": "海门",
"value": 1
},
{
"name": "鄂尔多斯",
"value": 1
}
]}

View File

@ -0,0 +1,33 @@
package com.atguigu.rtgmall.mapper;
import com.atguigu.rtgmall.beans.KeywordStats;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.mapper
*@Author: markilue
*@CreateTime: 2023-05-15 15:01
*@Description: TODO 关键词统计Mapper接口
*@Version: 1.0
*/
public interface KeywordStatsMapper {
@Select("select keyword, " +
" sum(keyword_stats.ct * " +
" multiIf( " +
" source = 'SEARCH', 10, " +
" source = 'ORDER', 5, " +
" source = 'CART', 2, " +
" source = 'CLICK', 1, 0 " +
" )) ct " +
"from keyword_stats " +
"where toYYYYMMDD(stt) = #{date} " +
"group by keyword " +
"order by sum (keyword_stats.ct) desc " +
"limit #{limit}")
List<KeywordStats> selectKeywordStats(@Param("date") Integer date,@Param("limit") Integer limit);
}

View File

@ -0,0 +1,33 @@
package com.atguigu.rtgmall.mapper;
import com.atguigu.rtgmall.beans.VisitorStats;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.mapper
*@Author: markilue
*@CreateTime: 2023-05-15 13:40
*@Description: TODO 访客统计mapper
*@Version: 1.0
*/
public interface VisitorStatsMapper {
@Select("select " +
" is_new,sum(uv_ct) uv_ct,sum(pv_ct) pv_ct,sum(sv_ct) sv_ct,sum(uj_ct) uj_ct,sum(dur_sum) dur_sum " +
"from " +
" visitor_stats " +
"where toYYYYMMDD(stt)=#{date} " +
"group by " +
" is_new")
List<VisitorStats> selectVisitorStats(Integer date);
@Select("select toHour(stt) hr,sum(uv_ct) uv_ct,sum(pv_ct) pv_ct,sum(if(is_new='1',visitor_stats.uv_ct,0)) new_uv\n" +
"from visitor_stats " +
"group by hr")
List<VisitorStats> selectVisitorStatsByHr(Integer date);
}

View File

@ -35,3 +35,38 @@ select province_id,province_name,sum(order_amount) order_amount
from province_stats from province_stats
where toYYYYMMDD(stt) = 20230511 where toYYYYMMDD(stt) = 20230511
group by province_id, province_name group by province_id, province_name
--
select is_new,
sum(uv_ct) uv_ct,
sum(pv_ct) pv_ct,
sum(sv_ct) sv_ct,
sum(uj_ct) uj_ct,
sum(dur_sum) dur_sum
from visitor_stats
where toYYYYMMDD(stt) = 20230511
group by is_new;
-- 用户分时统计
-- 注意这里要使用visitor_stats.uv_ct不然不知道要去哪里拿
select toHour(stt) hr,
sum(uv_ct) uv_ct,
sum(pv_ct) pv_ct,
sum(if(is_new = '1', visitor_stats.uv_ct, 0)) new_uv
from visitor_stats
group by hr;
-- --(--)
select keyword,
sum(keyword_stats.ct *
multiIf(
source = 'SEARCH', 10,
source = 'ORDER', 5,
source = 'CART', 2,
source = 'CLICK', 1, 0
)) ct
from keyword_stats
where toYYYYMMDD(stt) = 20230512
group by keyword
order by sum (keyword_stats.ct) desc
limit 5;

View File

@ -0,0 +1,18 @@
package com.atguigu.rtgmall.service;
import com.atguigu.rtgmall.beans.KeywordStats;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.service
*@Author: markilue
*@CreateTime: 2023-05-15 15:09
*@Description: TODO 关键词统计service
*@Version: 1.0
*/
public interface KeywordStatsService {
List<KeywordStats> getKeywordStats(Integer date,Integer limit);
}

View File

@ -0,0 +1,19 @@
package com.atguigu.rtgmall.service;
import com.atguigu.rtgmall.beans.VisitorStats;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.service
*@Author: markilue
*@CreateTime: 2023-05-15 13:47
*@Description: TODO 访客统计service
*@Version: 1.0
*/
public interface VisitorStatsService {
List<VisitorStats> getVisitorStats(Integer date);
List<VisitorStats> getVisitorStatsByHr(Integer date);
}

View File

@ -0,0 +1,28 @@
package com.atguigu.rtgmall.service.impl;
import com.atguigu.rtgmall.beans.KeywordStats;
import com.atguigu.rtgmall.mapper.KeywordStatsMapper;
import com.atguigu.rtgmall.service.KeywordStatsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.service.impl
*@Author: markilue
*@CreateTime: 2023-05-15 15:10
*@Description: TODO 关键词统计服务实现类
*@Version: 1.0
*/
@Service
public class KeywordStatsServiceImpl implements KeywordStatsService {
@Autowired
private KeywordStatsMapper keywordStatsMapper;
@Override
public List<KeywordStats> getKeywordStats(Integer date, Integer limit) {
return keywordStatsMapper.selectKeywordStats(date, limit);
}
}

View File

@ -0,0 +1,34 @@
package com.atguigu.rtgmall.service.impl;
import com.atguigu.rtgmall.beans.VisitorStats;
import com.atguigu.rtgmall.mapper.VisitorStatsMapper;
import com.atguigu.rtgmall.service.VisitorStatsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
*@BelongsProject: rt-gmall-parent
*@BelongsPackage: com.atguigu.rtgmall.service.impl
*@Author: markilue
*@CreateTime: 2023-05-15 13:48
*@Description: TODO 访客主题服务实现类
*@Version: 1.0
*/
@Service
public class VisitorStatsServiceImpl implements VisitorStatsService {
@Autowired
private VisitorStatsMapper visitorStatsMapper;
@Override
public List<VisitorStats> getVisitorStats(Integer date) {
return visitorStatsMapper.selectVisitorStats(date);
}
@Override
public List<VisitorStats> getVisitorStatsByHr(Integer date) {
return visitorStatsMapper.selectVisitorStatsByHr(date);
}
}

View File

@ -25,12 +25,14 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <artifactId>flink-java</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId> <artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
@ -43,18 +45,21 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId> <artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId> <artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId> <artifactId>flink-json</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
@ -67,24 +72,28 @@
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId> <artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version> <version>${hadoop.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<!--Flink默认使用的是slf4j记录日志相当于一个日志的接口,我们这里使用log4j作为具体的日志实现--> <!--Flink默认使用的是slf4j记录日志相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>1.7.25</version> <version>1.7.25</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version> <version>1.7.25</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId> <artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version> <version>2.14.0</version>
<scope>provided</scope>
</dependency> </dependency>
<!--lomback插件依赖--> <!--lomback插件依赖-->
@ -169,12 +178,14 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<!--IK分词器--> <!--IK分词器-->

View File

@ -60,7 +60,7 @@ public class BaseLogApp {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度 //1.2 设置并行度
env.setParallelism(4); //这里需要与kafka对应的分区相对应 env.setParallelism(2); //这里需要与kafka对应的分区相对应
//TODO 2.检查点相关设置 //TODO 2.检查点相关设置

View File

@ -31,7 +31,7 @@ public class KeywordStatsApp {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
env.setParallelism(4); env.setParallelism(2);
//TODO 2.检查点设置 //TODO 2.检查点设置
env.enableCheckpointing(5000L); env.enableCheckpointing(5000L);
@ -57,6 +57,7 @@ public class KeywordStatsApp {
" WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND" + " WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND" +
")" + ")" +
" WITH (" + MyKafkaUtils.getKafkaDDL(topic, groupId) + ")"; " WITH (" + MyKafkaUtils.getKafkaDDL(topic, groupId) + ")";
System.out.println(createSQl);
tableEnv.executeSql(createSQl); tableEnv.executeSql(createSQl);
//TODO 4.将动态表中表示搜索行为的记录过滤出来 //TODO 4.将动态表中表示搜索行为的记录过滤出来

View File

@ -111,3 +111,19 @@ select DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:m
'SEARCH' source, 'SEARCH' source,
UNIX_TIMESTAMP() * 1000 ts UNIX_TIMESTAMP() * 1000 ts
from UnnamedTable$1group by TUMBLE(rowtime, INTERVAL '10' SECOND), keyword from UnnamedTable$1group by TUMBLE(rowtime, INTERVAL '10' SECOND), keyword
create table page_view
(
common MAP<STRING,STRING>,
page MAP<STRING,STRING>,
ts BIGINT,
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR rowtime as rowtime - INTERVAL '3' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'dwd_page_log',
'properties.bootstrap.servers' = 'Ding202:9092,Ding203:9092,Ding204:9092',
'properties.group.id' = 'keyword_stats_app_group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json')