用户画像example更新

This commit is contained in:
markilue 2022-05-15 20:46:32 +08:00
parent c6828f5758
commit 115f0a9e9f
8 changed files with 199 additions and 1 deletions

View File

@ -0,0 +1,22 @@
package com.atguigu.userprofile.common.utils
import java.sql.{Connection, DriverManager, Statement}
import java.util.Properties
object ClickhouseUtils {
private val properties: Properties = MyPropertiesUtil.load("config.properties")
val CLICKHOUSE_URL = properties.getProperty("clickhouse.url")
def executeSql(sql: String ): Unit ={
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
val connection: Connection = DriverManager.getConnection(CLICKHOUSE_URL, null, null)
val statement: Statement = connection.createStatement()
statement.execute(sql)
connection.close()
}
}

View File

@ -0,0 +1,11 @@
hdfs-store.path=hdfs://Ding202:8020/user_profile
data-warehouse.dbname=gmall
user-profile.dbname=user_profile0224
# mysqlÅäÖÃ
mysql.url=jdbc:mysql://Ding202:3306/user_profile_manager_0224?characterEncoding=utf-8&useSSL=false
mysql.username=root
mysql.password=123456
# clickhouseÅäÖÃ
clickhouse.url=jdbc:clickhouse://Ding202:8123/user_profile0224

View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>

View File

@ -0,0 +1,19 @@
<configuration>
<!-- nn web端访问地址-->
<property>
<name>dfs.namenode.http-address</name>
<value>Ding202:9870</value>
</property>
<!-- 2nn web端访问地址-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>Ding204:9868</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,41 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://Ding202:3306/metastore?createDatabaseIfNotExist=true&amp;characterEncoding=utf-8&amp;useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>

View File

@ -0,0 +1,12 @@
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

View File

@ -1,10 +1,84 @@
package com.atguigu.userprofile.app
import java.util.Properties
import com.atguigu.userprofile.common.bean.TagInfo
import com.atguigu.userprofile.common.dao.TagInfoDAO
import com.atguigu.userprofile.common.utils.{ClickhouseUtils, MyPropertiesUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
*
* 1 要生成clickhouse的宽表
*
* 2 读取hive的宽表 读取成为Dataframe
*
*
* 3 把Dataframe 写入到clickhouse中
*/
object TaskExportCkApp{
def main(args: Array[String]): Unit = {
//TODO 1 读取hive的宽表 读取成为Dataframe
val sparkConf: SparkConf = new SparkConf().setAppName("task_export_app") .setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val taskDate =args(1);
val taskId = args(0);
//0 todo 要生成clickhouse的宽表
val tagList: List[TagInfo] = TagInfoDAO.getTagInfoList()
/*
todo:
create table user_tag_merge_20210703 (uid String,tagcode1 String,tagcode2 String,...)
engine= MergeTree
order by uid
*/
//location的位置
val properties: Properties = MyPropertiesUtil.load("config.properties")
val hdfsPath = properties.getProperty("hdfs-store.path")
val dwDbName = properties.getProperty("data-warehouse.dbname")
val upDBName = properties.getProperty("user-profile.dbname")
val tableName=s"user_tag_merge_"+taskDate.replace("-","")
val tagCodeSql =tagList.map(_.tagCode.toLowerCase()+" String").mkString(",")
//为了避免以后发现之前的表中有错误或者想重新跑一次
val dropTableSQL=s"drop table if exists $upDBName.${tableName}"
val createTableSql=
s"""
|create table $upDBName.$tableName (uid String,$tagCodeSql)
|engine= MergeTree
|order by uid
|""".stripMargin
println(dropTableSQL)
println(createTableSql)
//通过执行sql
ClickhouseUtils.executeSql(dropTableSQL)
ClickhouseUtils.executeSql(createTableSql)
val dataFrame: DataFrame = sparkSession.sql(s"select * from $upDBName.$tableName")
//todo 3.把Dataframe 写入到clickhouse中
val clickhouseUrl = properties.getProperty("clickhouse.url")
// 利用jdbc 算子写入clickhouse
dataFrame.write.mode(SaveMode.Append)
.option("batchsize", "100")
.option("isolationLevel", "NONE") // 关闭事务
.option("numPartitions", "4") // 设置并发
.option("driver","ru.yandex.clickhouse.ClickHouseDriver")
.jdbc(clickhouseUrl,tableName,new Properties())
}

View File

@ -27,7 +27,7 @@ object TaskMergeApp {
val taskDate:String = args(1);
// //添加Spark执行环境
val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app") //.setMaster("local[*]")
val sparkConf: SparkConf = new SparkConf().setAppName("task_merge_app") //.setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
//TODO 1 读取到标签机和的定义,提取标签的编码 作为 宽表的标签字段