diff --git a/Big_data_example/user_profile/common/src/main/scala/com/atguigu/userprofile/common/utils/ClickhouseUtils.scala b/Big_data_example/user_profile/common/src/main/scala/com/atguigu/userprofile/common/utils/ClickhouseUtils.scala new file mode 100644 index 0000000..194d0de --- /dev/null +++ b/Big_data_example/user_profile/common/src/main/scala/com/atguigu/userprofile/common/utils/ClickhouseUtils.scala @@ -0,0 +1,22 @@ +package com.atguigu.userprofile.common.utils + +import java.sql.{Connection, DriverManager, Statement} +import java.util.Properties + +object ClickhouseUtils { + + private val properties: Properties = MyPropertiesUtil.load("config.properties") + val CLICKHOUSE_URL = properties.getProperty("clickhouse.url") + + + def executeSql(sql: String ): Unit ={ + Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + val connection: Connection = DriverManager.getConnection(CLICKHOUSE_URL, null, null) + val statement: Statement = connection.createStatement() + statement.execute(sql) + connection.close() + + } + + +} diff --git a/Big_data_example/user_profile/export-clickhouse/src/main/resources/config.properties b/Big_data_example/user_profile/export-clickhouse/src/main/resources/config.properties new file mode 100644 index 0000000..d76795a --- /dev/null +++ b/Big_data_example/user_profile/export-clickhouse/src/main/resources/config.properties @@ -0,0 +1,11 @@ +hdfs-store.path=hdfs://Ding202:8020/user_profile +data-warehouse.dbname=gmall +user-profile.dbname=user_profile0224 + +# mysql +mysql.url=jdbc:mysql://Ding202:3306/user_profile_manager_0224?characterEncoding=utf-8&useSSL=false +mysql.username=root +mysql.password=123456 + +# clickhouse +clickhouse.url=jdbc:clickhouse://Ding202:8123/user_profile0224 diff --git a/Big_data_example/user_profile/export-clickhouse/src/main/resources/core-site.xml b/Big_data_example/user_profile/export-clickhouse/src/main/resources/core-site.xml new file mode 100644 index 0000000..e32dcfe --- /dev/null +++ b/Big_data_example/user_profile/export-clickhouse/src/main/resources/core-site.xml @@ -0,0 +1,19 @@ + + + + + io.compression.codecs + + org.apache.hadoop.io.compress.GzipCodec, + org.apache.hadoop.io.compress.DefaultCodec, + org.apache.hadoop.io.compress.BZip2Codec, + org.apache.hadoop.io.compress.SnappyCodec, + com.hadoop.compression.lzo.LzoCodec, + com.hadoop.compression.lzo.LzopCodec + + + + io.compression.codec.lzo.class + com.hadoop.compression.lzo.LzoCodec + + diff --git a/Big_data_example/user_profile/export-clickhouse/src/main/resources/hdfs-site.xml b/Big_data_example/user_profile/export-clickhouse/src/main/resources/hdfs-site.xml new file mode 100644 index 0000000..671652f --- /dev/null +++ b/Big_data_example/user_profile/export-clickhouse/src/main/resources/hdfs-site.xml @@ -0,0 +1,19 @@ + + + + + dfs.namenode.http-address + Ding202:9870 + + + + dfs.namenode.secondary.http-address + Ding204:9868 + + + dfs.client.use.datanode.hostname + true + + + + diff --git a/Big_data_example/user_profile/export-clickhouse/src/main/resources/hive-site.xml b/Big_data_example/user_profile/export-clickhouse/src/main/resources/hive-site.xml new file mode 100644 index 0000000..32ece85 --- /dev/null +++ b/Big_data_example/user_profile/export-clickhouse/src/main/resources/hive-site.xml @@ -0,0 +1,41 @@ + + + + + javax.jdo.option.ConnectionURL + jdbc:mysql://Ding202:3306/metastore?createDatabaseIfNotExist=true&characterEncoding=utf-8&useSSL=false + JDBC connect string for a JDBC metastore + + + + javax.jdo.option.ConnectionDriverName + com.mysql.jdbc.Driver + Driver class name for a JDBC metastore + + + + javax.jdo.option.ConnectionUserName + root + username to use against metastore database + + + + javax.jdo.option.ConnectionPassword + 123456 + password to use against metastore database + + + hive.cli.print.header + true + + + + hive.cli.print.current.db + true + + + hive.metastore.schema.verification + false + + + diff --git a/Big_data_example/user_profile/export-clickhouse/src/main/resources/log4j.properties b/Big_data_example/user_profile/export-clickhouse/src/main/resources/log4j.properties new file mode 100644 index 0000000..48f5794 --- /dev/null +++ b/Big_data_example/user_profile/export-clickhouse/src/main/resources/log4j.properties @@ -0,0 +1,12 @@ +log4j.rootLogger=error, stdout,R +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n + +log4j.appender.R=org.apache.log4j.RollingFileAppender +log4j.appender.R.File=../log/agent.log +log4j.appender.R.MaxFileSize=1024KB +log4j.appender.R.MaxBackupIndex=1 + +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n diff --git a/Big_data_example/user_profile/export-clickhouse/src/main/scala/com/atguigu/userprofile/app/TaskExportCkApp.scala b/Big_data_example/user_profile/export-clickhouse/src/main/scala/com/atguigu/userprofile/app/TaskExportCkApp.scala index 6fbd0af..61db13e 100644 --- a/Big_data_example/user_profile/export-clickhouse/src/main/scala/com/atguigu/userprofile/app/TaskExportCkApp.scala +++ b/Big_data_example/user_profile/export-clickhouse/src/main/scala/com/atguigu/userprofile/app/TaskExportCkApp.scala @@ -1,10 +1,84 @@ package com.atguigu.userprofile.app +import java.util.Properties + +import com.atguigu.userprofile.common.bean.TagInfo +import com.atguigu.userprofile.common.dao.TagInfoDAO +import com.atguigu.userprofile.common.utils.{ClickhouseUtils, MyPropertiesUtil} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +/** + * + * 1 要生成clickhouse的宽表 + * + * 2 读取hive的宽表 读取成为Dataframe + * + * + * 3 把Dataframe 写入到clickhouse中 + */ object TaskExportCkApp{ def main(args: Array[String]): Unit = { + //TODO 1 读取hive的宽表 读取成为Dataframe + val sparkConf: SparkConf = new SparkConf().setAppName("task_export_app") .setMaster("local[*]") + val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() + + val taskDate =args(1); + val taskId = args(0); + + //0 todo 要生成clickhouse的宽表 + val tagList: List[TagInfo] = TagInfoDAO.getTagInfoList() + + /* + todo: + create table user_tag_merge_20210703 (uid String,tagcode1 String,tagcode2 String,...) + engine= MergeTree + order by uid + */ + + //location的位置 + val properties: Properties = MyPropertiesUtil.load("config.properties") + val hdfsPath = properties.getProperty("hdfs-store.path") + val dwDbName = properties.getProperty("data-warehouse.dbname") + val upDBName = properties.getProperty("user-profile.dbname") + + val tableName=s"user_tag_merge_"+taskDate.replace("-","") + val tagCodeSql =tagList.map(_.tagCode.toLowerCase()+" String").mkString(",") + + //为了避免以后发现之前的表中有错误或者想重新跑一次 + val dropTableSQL=s"drop table if exists $upDBName.${tableName}" + + val createTableSql= + s""" + |create table $upDBName.$tableName (uid String,$tagCodeSql) + |engine= MergeTree + |order by uid + |""".stripMargin + + println(dropTableSQL) + println(createTableSql) + + //通过执行sql + ClickhouseUtils.executeSql(dropTableSQL) + ClickhouseUtils.executeSql(createTableSql) + + + val dataFrame: DataFrame = sparkSession.sql(s"select * from $upDBName.$tableName") + + //todo 3.把Dataframe 写入到clickhouse中 + val clickhouseUrl = properties.getProperty("clickhouse.url") + + // 利用jdbc 算子写入clickhouse + dataFrame.write.mode(SaveMode.Append) + .option("batchsize", "100") + .option("isolationLevel", "NONE") // 关闭事务 + .option("numPartitions", "4") // 设置并发 + .option("driver","ru.yandex.clickhouse.ClickHouseDriver") + .jdbc(clickhouseUrl,tableName,new Properties()) + } diff --git a/Big_data_example/user_profile/merge/src/main/scala/com/atguigu/userprofile/app/TaskMergeApp.scala b/Big_data_example/user_profile/merge/src/main/scala/com/atguigu/userprofile/app/TaskMergeApp.scala index 4b1e073..3dd6d3e 100644 --- a/Big_data_example/user_profile/merge/src/main/scala/com/atguigu/userprofile/app/TaskMergeApp.scala +++ b/Big_data_example/user_profile/merge/src/main/scala/com/atguigu/userprofile/app/TaskMergeApp.scala @@ -27,7 +27,7 @@ object TaskMergeApp { val taskDate:String = args(1); // //添加Spark执行环境 - val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app") //.setMaster("local[*]") + val sparkConf: SparkConf = new SparkConf().setAppName("task_merge_app") //.setMaster("local[*]") val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() //TODO 1 读取到标签机和的定义,提取标签的编码 作为 宽表的标签字段