用户画像example更新

This commit is contained in:
markilue 2022-05-20 21:33:10 +08:00
parent 43e222b76b
commit ecbc11a572
9 changed files with 429 additions and 0 deletions

View File

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>user_profile</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>machine-learning</artifactId>
<dependencies>
<dependency>
<artifactId>common</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,11 @@
hdfs-store.path=hdfs://Ding202:8020/user_profile
data-warehouse.dbname=gmall
user-profile.dbname=user_profile0224
# mysqlÅäÖÃ
mysql.url=jdbc:mysql://Ding202:3306/user_profile_manager_0224?characterEncoding=utf-8&useSSL=false
mysql.username=root
mysql.password=123456
# clickhouseÅäÖÃ
clickhouse.url=jdbc:clickhouse://Ding202:8123/user_profile0224

View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>

View File

@ -0,0 +1,19 @@
<configuration>
<!-- nn web端访问地址-->
<property>
<name>dfs.namenode.http-address</name>
<value>Ding202:9870</value>
</property>
<!-- 2nn web端访问地址-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>Ding204:9868</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,41 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://Ding202:3306/metastore?createDatabaseIfNotExist=true&amp;characterEncoding=utf-8&amp;useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>

View File

@ -0,0 +1,12 @@
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

View File

@ -0,0 +1,183 @@
package com.atguigu.userprofile.ml.pipline
import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel, Transformer}
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, VectorIndexer}
import org.apache.spark.sql.DataFrame
/**
* 为了进行训练的类
*/
class MyPipeLine {
// 流水线对象
var pipeline: Pipeline = null
// 流水线训练后模型
private var pipelineModel:PipelineModel=null
//流水线初始化
def init(): MyPipeLine ={
pipeline=new Pipeline().setStages(
Array(
createLabelIndexer(),
createFeatureAssemble(),
createFeatureIndexer(),
createClassifier()
)
)
this
}
var labelColName:String = null
var featureColNames:Array[String]=null
//// 以下为参数 ////////////////////
//最大分类树用于识别连续值特征和分类特征
private var maxCategories=5
// 最大分支数
private var maxBins=5
// 最大树深度
private var maxDepth=5
//最小分支包含数据条数
private var minInstancesPerNode=1
//最小分支信息增益
private var minInfoGain=0.0
def setLabelColName(labelColName: String): MyPipeLine = {
this.labelColName = labelColName
this
}
def setFeatureColNames(featureColNames: Array[String]): MyPipeLine = {
this.featureColNames = featureColNames
this
}
def setMaxCategories(maxCategories:Int): MyPipeLine ={
this.maxCategories=maxCategories
this
}
def setMaxBins(maxBins:Int): MyPipeLine ={
this.maxBins=maxBins
this
}
def setMaxDepth(maxDepth:Int): MyPipeLine ={
this.maxDepth=maxDepth
this
}
def setMinInstancesPerNode(minInstancesPerNode:Int): MyPipeLine ={
this.minInstancesPerNode=minInstancesPerNode
this
}
def setMinInfoGain(minInfoGain:Double): MyPipeLine ={
this.minInfoGain=minInfoGain
this
}
//1 创建标签索引
// 把标签值 转换为矢量值
// ('男','女') (0,1,2,3,4,....) 按照 出现概率大小次序 概率越大 矢量越小
//InputCol:参考答案列表
//OutputCol:转换为矢量值得列名,自定义
def createLabelIndexer(): StringIndexer = {
val indexer = new StringIndexer()
//设置标签索引 -> 输入列和输出列
indexer.setInputCol(labelColName).setOutputCol("label_index")
indexer
}
//2 创建特征集合列
//InputCol:特征列名
//OutputCol: 特征集合列名,自定义
def createFeatureAssemble(): VectorAssembler = {
val assembler = new VectorAssembler()
assembler.setInputCols(featureColNames).setOutputCol("feature_assemble")
assembler
}
//3 创建特征索引列
// 把特征集合中的原值, 变为矢量值 按照 出现概率大小次序 概率越大 矢量越小
// 要识别哪些是 连续值特征(线性特征) 哪些是离散特征(分类特征) 看特征个数 个数超过一个数会当做连续值
def createFeatureIndexer():VectorIndexer ={
val indexer = new VectorIndexer()
//设置最大的分类
indexer.setInputCol("feature_assemble").setOutputCol("feature_index").setMaxCategories(maxCategories)
indexer
}
//4 创建分类器 ->这里使用决策树分类器
def createClassifier():DecisionTreeClassifier ={
val classifier = new DecisionTreeClassifier()
classifier.setLabelCol("label_index")
.setFeaturesCol("feature_index")
.setPredictionCol("prediction_col")
.setImpurity("gini") //使用信息熵还是gini
classifier
}
//训练
def train(dataFrame: DataFrame):Unit ={
pipelineModel = pipeline.fit(dataFrame)
}
//预测
def predict(dataFrame: DataFrame):DataFrame ={
val predictedDataFrame = pipelineModel.transform(dataFrame)
predictedDataFrame
}
//打印出 完整的决策树
def printDecisionTree():Unit ={
val transformer: Transformer = pipelineModel.stages(3) //取第四号位置的classifier
val classificationModel: DecisionTreeClassificationModel = transformer.asInstanceOf[DecisionTreeClassificationModel]
println(classificationModel.toDebugString)
}
//打印出 各个特征的权值
def printFeatureWeights():Unit ={
val transformer: Transformer = pipelineModel.stages(3) //取第四号位置的classifier
val classificationModel: DecisionTreeClassificationModel = transformer.asInstanceOf[DecisionTreeClassificationModel]
println(classificationModel.featureImportances)
}
}

View File

@ -0,0 +1,70 @@
package com.atguigu.userprofile.ml.train
import com.atguigu.userprofile.ml.pipline.MyPipeLine
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object StudGenderTrain {
def main(args: Array[String]): Unit = {
//0 创建spark环境
val sparkConf: SparkConf = new SparkConf().setAppName("stud_gender_train_app").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
//1 查询数据 ->需要把数字转为矢量
println("查询数据...")
val sql =
s"""
|select uid,
|case hair when '长发' then 101
| when '短发' then 102
| when '板寸' then 103
| end as hair,
| height,
|case skirt when '是' then 11
| when '否' then 12
| end as skirt,
|case age when '00' then 100
| when '90' then 90
| when '80' then 80
| end as age,
| gender
|from test.student
|""".stripMargin
println(sql)
val dataFrame = sparkSession.sql(sql)
println(dataFrame)
//2 切分数据 分成 训练集和测试集
println("切分数据...")
val Array(trainDF,testDF) = dataFrame.randomSplit(Array(0.8, 0.2))
//3 创建MyPipeline
println("创建训练...")
val myPineLine = new MyPipeLine().setLabelColName("gender")
.setFeatureColNames(Array("hair","height","skirt","age"))
.setMaxCategories(5)
.init()
//4 进行训练
println("进行训练...")
myPineLine.train(trainDF)
//打印决策树和权重
myPineLine.printDecisionTree()
myPineLine.printFeatureWeights()
//5 进行预测
println("进行预测...")
val predictedDataFrame = myPineLine.predict(testDF)
//6 打印预测结果
//false表示不切割
predictedDataFrame.show(100,false)
}
}

View File

@ -14,6 +14,7 @@
<module>merge</module>
<module>export-clickhouse</module>
<module>toBitmap</module>
<module>machine-learning</module>
</modules>
<properties>