用户画像example更新
This commit is contained in:
parent
fda72667cb
commit
43e222b76b
|
|
@ -13,6 +13,7 @@
|
|||
<module>sql</module>
|
||||
<module>merge</module>
|
||||
<module>export-clickhouse</module>
|
||||
<module>toBitmap</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
|
|
@ -128,4 +129,7 @@
|
|||
</dependencyManagement>
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,64 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>user_profile</artifactId>
|
||||
<groupId>com.atguigu</groupId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>toBitmap</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<artifactId>common</artifactId>
|
||||
<groupId>com.atguigu</groupId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- 该插件用于将Scala代码编译成class文件 -->
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>3.4.6</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<!-- 声明绑定到maven的compile阶段 -->
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<configuration>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
hdfs-store.path=hdfs://Ding202:8020/user_profile
|
||||
data-warehouse.dbname=gmall
|
||||
user-profile.dbname=user_profile0224
|
||||
|
||||
# mysqlÅäÖÃ
|
||||
mysql.url=jdbc:mysql://Ding202:3306/user_profile_manager_0224?characterEncoding=utf-8&useSSL=false
|
||||
mysql.username=root
|
||||
mysql.password=123456
|
||||
|
||||
# clickhouseÅäÖÃ
|
||||
clickhouse.url=jdbc:clickhouse://Ding202:8123/user_profile0224
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>io.compression.codecs</name>
|
||||
<value>
|
||||
org.apache.hadoop.io.compress.GzipCodec,
|
||||
org.apache.hadoop.io.compress.DefaultCodec,
|
||||
org.apache.hadoop.io.compress.BZip2Codec,
|
||||
org.apache.hadoop.io.compress.SnappyCodec,
|
||||
com.hadoop.compression.lzo.LzoCodec,
|
||||
com.hadoop.compression.lzo.LzopCodec
|
||||
</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>io.compression.codec.lzo.class</name>
|
||||
<value>com.hadoop.compression.lzo.LzoCodec</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
<configuration>
|
||||
|
||||
<!-- nn web端访问地址-->
|
||||
<property>
|
||||
<name>dfs.namenode.http-address</name>
|
||||
<value>Ding202:9870</value>
|
||||
</property>
|
||||
<!-- 2nn web端访问地址-->
|
||||
<property>
|
||||
<name>dfs.namenode.secondary.http-address</name>
|
||||
<value>Ding204:9868</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>dfs.client.use.datanode.hostname</name>
|
||||
<value>true</value>
|
||||
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionURL</name>
|
||||
<value>jdbc:mysql://Ding202:3306/metastore?createDatabaseIfNotExist=true&characterEncoding=utf-8&useSSL=false</value>
|
||||
<description>JDBC connect string for a JDBC metastore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionDriverName</name>
|
||||
<value>com.mysql.jdbc.Driver</value>
|
||||
<description>Driver class name for a JDBC metastore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionUserName</name>
|
||||
<value>root</value>
|
||||
<description>username to use against metastore database</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionPassword</name>
|
||||
<value>123456</value>
|
||||
<description>password to use against metastore database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive.cli.print.header</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hive.cli.print.current.db</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive.metastore.schema.verification</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
log4j.rootLogger=error, stdout,R
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
|
||||
|
||||
log4j.appender.R=org.apache.log4j.RollingFileAppender
|
||||
log4j.appender.R.File=../log/agent.log
|
||||
log4j.appender.R.MaxFileSize=1024KB
|
||||
log4j.appender.R.MaxBackupIndex=1
|
||||
|
||||
log4j.appender.R.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
|
||||
|
|
@ -0,0 +1,119 @@
|
|||
package com.atguigu.userfile.app
|
||||
|
||||
import com.atguigu.userprofile.common.bean.TagInfo
|
||||
import com.atguigu.userprofile.common.constants.ConstCode
|
||||
import com.atguigu.userprofile.common.dao.TagInfoDAO
|
||||
import com.atguigu.userprofile.common.utils.ClickhouseUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
/**
|
||||
* 1 建表 4个表 string decimal long date
|
||||
* 2 要查询标签清单列表 查mysql
|
||||
* 把标签列表 根据标签的值类型不同 ,分成四个列表
|
||||
* 每个列表组成独立的insert select 语句 ,一个四个insert
|
||||
* 3 insert into select
|
||||
*/
|
||||
object ToBitmapApp {
|
||||
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
//声明环境
|
||||
val sparkConf: SparkConf = new SparkConf().setAppName("bitmap_app")//.setMaster("local[*]")
|
||||
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
|
||||
|
||||
val taskId: String = args(0);
|
||||
val taskDate: String = args(1);
|
||||
|
||||
//TODO 1 建表 4个表 string decimal long date(提前一次手工创建)
|
||||
|
||||
//clickhouse 上的表
|
||||
|
||||
//TODO 2 要查询标签清单列表 查mysql
|
||||
// * 把标签列表 根据标签的值类型不同 ,分成四个列表
|
||||
// * 每个列表组成独立的insert select 语句 ,一个四个insert
|
||||
|
||||
val tagList: List[TagInfo] = TagInfoDAO.getTagInfoList()
|
||||
val tagInfoStringList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]
|
||||
val tagInfoDecimalList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]
|
||||
val tagInfoLongList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]
|
||||
val tagInfoDateList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]
|
||||
|
||||
tagList.foreach(
|
||||
tagInfo => {
|
||||
if (tagInfo.tagValueType == ConstCode.TAG_VALUE_TYPE_STRING) {
|
||||
tagInfoStringList.append(tagInfo)
|
||||
}
|
||||
if (tagInfo.tagValueType == ConstCode.TAG_VALUE_TYPE_DECIMAL) {
|
||||
tagInfoDecimalList.append(tagInfo)
|
||||
}
|
||||
if (tagInfo.tagValueType == ConstCode.TAG_VALUE_TYPE_LONG) {
|
||||
tagInfoLongList.append(tagInfo)
|
||||
}
|
||||
if (tagInfo.tagValueType == ConstCode.TAG_VALUE_TYPE_DATE) {
|
||||
tagInfoDateList.append(tagInfo)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
//TODO 3 分别根据不同的标签插入到map表 insert into select
|
||||
insertTag(tagInfoStringList, "user_tag_value_string", taskDate.replace("-", ""))
|
||||
insertTag(tagInfoDecimalList, "user_tag_value_decimal", taskDate.replace("-", ""))
|
||||
insertTag(tagInfoLongList, "user_tag_value_long", taskDate.replace("-", ""))
|
||||
insertTag(tagInfoDateList, "user_tag_value_date", taskDate.replace("-", ""))
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* insert into $tableName
|
||||
* select tv.1,tv.2,groupBitmapState(uid),'$taskDate' as dt
|
||||
* from(
|
||||
* select uid,arrayJoin([('gender',gender),('agegroup',agegroup),('favor',favor)]) tv from user_tag_merge
|
||||
* ) user_tag
|
||||
* group by tv.1,tv.2;
|
||||
*
|
||||
*/
|
||||
|
||||
def insertTag(tagList: ListBuffer[TagInfo], bitmapTableName: String, taskDate: String): Unit = {
|
||||
|
||||
//TODO 需要做幂等性处理,每次插入数据前,需要做分区清理
|
||||
//alter table ${bitmapTableName} delete where dt='$taskDate'
|
||||
|
||||
val clearSQL=s"alter table ${bitmapTableName} delete where dt='$taskDate'"
|
||||
println(clearSQL)
|
||||
ClickhouseUtils.executeSql(clearSQL)
|
||||
|
||||
|
||||
|
||||
if(tagList.size >0){
|
||||
//('gender',gender),('agegroup',agegroup),('favor',favor)
|
||||
val tagCodeSQL: String = tagList.map(
|
||||
tagInfo => s"('${tagInfo.tagCode}',${tagInfo.tagCode.toLowerCase()})"
|
||||
).mkString(",")
|
||||
|
||||
val insertIntoSQL =
|
||||
s"""
|
||||
|insert into ${bitmapTableName}
|
||||
|select tv.1,tv.2,groupBitmapState(cast (uid as UInt64) ),'${taskDate}' as dt
|
||||
|from(
|
||||
|select uid,arrayJoin([$tagCodeSQL]) tv from user_tag_merge_${taskDate}
|
||||
|) user_tag
|
||||
|where tv.2<>''
|
||||
| group by tv.1,tv.2
|
||||
|""".stripMargin
|
||||
|
||||
println(insertIntoSQL)
|
||||
|
||||
ClickhouseUtils.executeSql(insertIntoSQL)
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue