用户画像example更新

This commit is contained in:
markilue 2022-05-24 21:54:17 +08:00
parent b3c7303820
commit ce889c6057
2 changed files with 160 additions and 2 deletions

View File

@ -0,0 +1,155 @@
package com.atguigu.userprofile.ml.app
import java.util.Properties
import com.atguigu.userprofile.common.utils.MyPropertiesUtil
import com.atguigu.userprofile.ml.pipline.MyPipeLine
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 第一部分 利用机器学习的模型 进行预测
* 1 按照模型的要求提供特征数据SQL
* 2 把hdfs中的模型加载
* 3 用模型对数据进行预测
* 4 把预测的结果进行转换 原值
*
* 第二部分:
* 1.画像平台定义一个标签
* 2.程序中建立标签表
* 3.insert into select xxx from abc
* 4.打包发布到标签任务中
*/
object BusiGenderApp {
def main(args: Array[String]): Unit = {
//0 创建spark环境
val sparkConf: SparkConf = new SparkConf().setAppName("stud_gender_train_app")//.setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
//获取任务参数
val taskId:String = args(0)
val taskDate:String =args(1)
//1 按照模型的要求提供特征数据SQL
println("按照模型的要求提供特征数据SQL")
val featureSQL=
s"""
|with user_c1
| as
| (
| select user_id,category1_id,during_time
| from dwd_page_log pl join dim_sku_info si
| on pl.page_item = si.id
| where pl.page_id = 'good_detail' and pl.page_item_type = 'sku_id' and pl.dt = '${taskDate}' and si.dt = '${taskDate}'
| ),
| user_label
| as (
| select id,gender from dim_user_info where dt='9999-99-99' and gender is null
| )
|select user_id,c1_rk1,c1_rk2,c1_rk3,male_dur,female_dur
|from
|(
|select user_id,sum(if(rk = 1,category1_id,0)) c1_rk1,sum(if(rk = 2,category1_id,0)) c1_rk2,sum(if(rk = 3,category1_id,0)) c1_rk3,
|sum(if(category1_id in (2,3,6) ,during_time,0)) male_dur,sum(if(category1_id in (11,15,8) ,during_time,0)) female_dur
|from
| (select user_id ,category1_id,ct,during_time,
| row_number() over( partition by user_id order by ct desc) rk
| from
| (
| select user_id ,category1_id,count(*) ct,sum(during_time) during_time
| from user_c1
| group by user_id,category1_id
| order by user_id,category1_id
| ) user_c1_ct
| order by user_id,category1_id) user_rk
|group by user_id
|) user_feature join user_label on user_feature.user_id=user_label.id
|""".stripMargin
val properties: Properties = MyPropertiesUtil.load("config.properties")
val inputDBName: String = properties.getProperty("data-warehouse.dbname")
sparkSession.sql(s"use ${inputDBName}")
val dataFrame: DataFrame = sparkSession.sql(featureSQL)
//2.把hdfs中的模型加载
println("把hdfs中的模型加载")
val saveModelPath: String = properties.getProperty("save-model.path")
val myPipeLine: MyPipeLine = new MyPipeLine().loadModel(saveModelPath)
//3 用模型对数据进行预测
println("用模型对数据进行预测")
val predictedDataFrame: DataFrame = myPipeLine.predict(dataFrame)
//4 把预测的结果进行转换 原值
println(" 把预测的结果进行转换 原值")
val predictedWithOriginDF: DataFrame = myPipeLine.convertOrigin(predictedDataFrame)
predictedWithOriginDF.show(100,false)
//让这个临时的dataframe可以使用 临时视图
predictedWithOriginDF.createTempView("predicted_gender")
// * 第二部分:
// * 1.画像平台定义一个标签
//手动平台建立
// * 2.程序中建立标签表
// 建表语句(表名 tagcode 字段 分区 格式 存储位置) ->每日全量
// create table TG_BUSI_PREDICTION_BUSIGENDER (uid string,tag_value String)
// comment '预测性别'
// partitioned by (dt string)
// ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
// location 'hdfs_path/dbName/tableName'
//location的位置
val hdfsPath = properties.getProperty("hdfs-store.path")
val dwDbName = properties.getProperty("data-warehouse.dbname")
val upDBName = properties.getProperty("user-profile.dbname")
val createTableSQL =
s"""
| create table if not exists $upDBName.tg_busi_prediction_busigender(uid string,tag_value string)
| comment '预测性别'
| partitioned by (dt string)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
| location '${hdfsPath}/${upDBName}/tg_busi_prediction_busigender'
|""".stripMargin
// * 3.insert overwrite table tg_busi_prediction_busigender(dt='$taskDate') //预测男女+手填男女
// select user_id,if(prediction_origin='F','女','男') from predicted_gender
// union all
// select id as uid , if(gender='F','女','男') as query_value from dim_user_info
// where dt='9999-99-99' and gender is not null
val insertSQL =
s"""
|insert overwrite table $upDBName.tg_busi_prediction_busigender partition(dt='$taskDate')
|select user_id,if(prediction_origin='F','女','男') from predicted_gender
| union all
|select id as uid , if(gender='F','女','男') as query_value from dim_user_info
|where dt='9999-99-99' and gender is not null
|""".stripMargin
println(createTableSQL)
sparkSession.sql(createTableSQL)
println(insertSQL)
sparkSession.sql(insertSQL)
// * 4.打包发布到标签任务中
}
}

View File

@ -230,9 +230,12 @@ class MyPipeLine {
//把生成的模型存储到指定的位置 ->hdfs //把生成的模型存储到指定的位置 ->hdfs
def saveModel(path:String) :Unit ={ def saveModel(path:String) :Unit ={
pipelineModel.write.overwrite().save(path) pipelineModel.write.overwrite().save(path)
}
//把已经存储到hdfs模型加载到对象中
def loadModel(path:String) :MyPipeLine={
pipelineModel = PipelineModel.load(path)
this
} }