diff --git a/Big_data_example/user_profile/machine-learning/src/main/scala/com/atguigu/userprofile/ml/app/BusiGenderApp.scala b/Big_data_example/user_profile/machine-learning/src/main/scala/com/atguigu/userprofile/ml/app/BusiGenderApp.scala new file mode 100644 index 0000000..1a19143 --- /dev/null +++ b/Big_data_example/user_profile/machine-learning/src/main/scala/com/atguigu/userprofile/ml/app/BusiGenderApp.scala @@ -0,0 +1,155 @@ +package com.atguigu.userprofile.ml.app + +import java.util.Properties + +import com.atguigu.userprofile.common.utils.MyPropertiesUtil +import com.atguigu.userprofile.ml.pipline.MyPipeLine +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * 第一部分: 利用机器学习的模型 进行预测 + * 1、 按照模型的要求提供特征数据SQL + * 2、 把hdfs中的模型加载 + * 3 、用模型对数据进行预测 + * 4、 把预测的结果进行转换 原值 + * + * 第二部分: + * 1.画像平台定义一个标签 + * 2.程序中建立标签表 + * 3.insert into select xxx from abc + * 4.打包发布到标签任务中 + */ +object BusiGenderApp { + + + def main(args: Array[String]): Unit = { + + //0 创建spark环境 + val sparkConf: SparkConf = new SparkConf().setAppName("stud_gender_train_app")//.setMaster("local[*]") + val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() + + //获取任务参数 + val taskId:String = args(0) + val taskDate:String =args(1) + + //1、 按照模型的要求提供特征数据SQL + println("按照模型的要求提供特征数据SQL") + val featureSQL= + s""" + |with user_c1 + | as + | ( + | select user_id,category1_id,during_time + | from dwd_page_log pl join dim_sku_info si + | on pl.page_item = si.id + | where pl.page_id = 'good_detail' and pl.page_item_type = 'sku_id' and pl.dt = '${taskDate}' and si.dt = '${taskDate}' + | ), + | user_label + | as ( + | select id,gender from dim_user_info where dt='9999-99-99' and gender is null + | ) + |select user_id,c1_rk1,c1_rk2,c1_rk3,male_dur,female_dur + |from + |( + |select user_id,sum(if(rk = 1,category1_id,0)) c1_rk1,sum(if(rk = 2,category1_id,0)) c1_rk2,sum(if(rk = 3,category1_id,0)) c1_rk3, + |sum(if(category1_id in (2,3,6) ,during_time,0)) male_dur,sum(if(category1_id in (11,15,8) ,during_time,0)) female_dur + |from + | (select user_id ,category1_id,ct,during_time, + | row_number() over( partition by user_id order by ct desc) rk + | from + | ( + | select user_id ,category1_id,count(*) ct,sum(during_time) during_time + | from user_c1 + | group by user_id,category1_id + | order by user_id,category1_id + | ) user_c1_ct + | order by user_id,category1_id) user_rk + |group by user_id + |) user_feature join user_label on user_feature.user_id=user_label.id + |""".stripMargin + + + + val properties: Properties = MyPropertiesUtil.load("config.properties") + val inputDBName: String = properties.getProperty("data-warehouse.dbname") + sparkSession.sql(s"use ${inputDBName}") + val dataFrame: DataFrame = sparkSession.sql(featureSQL) + + //2.把hdfs中的模型加载 + println("把hdfs中的模型加载") + + val saveModelPath: String = properties.getProperty("save-model.path") + val myPipeLine: MyPipeLine = new MyPipeLine().loadModel(saveModelPath) + //3 、用模型对数据进行预测 + println("用模型对数据进行预测") + val predictedDataFrame: DataFrame = myPipeLine.predict(dataFrame) + + //4、 把预测的结果进行转换 原值 + println(" 把预测的结果进行转换 原值") + val predictedWithOriginDF: DataFrame = myPipeLine.convertOrigin(predictedDataFrame) + predictedWithOriginDF.show(100,false) + + //让这个临时的dataframe可以使用 临时视图 + predictedWithOriginDF.createTempView("predicted_gender") + +// * 第二部分: +// * 1.画像平台定义一个标签 + //手动平台建立 + + +// * 2.程序中建立标签表 + // 建表语句(表名 tagcode 字段 分区 格式 存储位置) ->每日全量 + // create table TG_BUSI_PREDICTION_BUSIGENDER (uid string,tag_value String) + // comment '预测性别' + // partitioned by (dt string) + // ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' + // location 'hdfs_path/dbName/tableName' + + //location的位置 + val hdfsPath = properties.getProperty("hdfs-store.path") + val dwDbName = properties.getProperty("data-warehouse.dbname") + val upDBName = properties.getProperty("user-profile.dbname") + + val createTableSQL = + s""" + | create table if not exists $upDBName.tg_busi_prediction_busigender(uid string,tag_value string) + | comment '预测性别' + | partitioned by (dt string) + | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' + | location '${hdfsPath}/${upDBName}/tg_busi_prediction_busigender' + |""".stripMargin + +// * 3.insert overwrite table tg_busi_prediction_busigender(dt='$taskDate') //预测男女+手填男女 + // select user_id,if(prediction_origin='F','女','男') from predicted_gender + // union all + // select id as uid , if(gender='F','女','男') as query_value from dim_user_info + // where dt='9999-99-99' and gender is not null + + + val insertSQL = + s""" + |insert overwrite table $upDBName.tg_busi_prediction_busigender partition(dt='$taskDate') + |select user_id,if(prediction_origin='F','女','男') from predicted_gender + | union all + |select id as uid , if(gender='F','女','男') as query_value from dim_user_info + |where dt='9999-99-99' and gender is not null + |""".stripMargin + + println(createTableSQL) + sparkSession.sql(createTableSQL) + println(insertSQL) + sparkSession.sql(insertSQL) + + + + + +// * 4.打包发布到标签任务中 + + + + + } + +} diff --git a/Big_data_example/user_profile/machine-learning/src/main/scala/com/atguigu/userprofile/ml/pipline/MyPipeLine.scala b/Big_data_example/user_profile/machine-learning/src/main/scala/com/atguigu/userprofile/ml/pipline/MyPipeLine.scala index 2179de9..c5b0aa6 100644 --- a/Big_data_example/user_profile/machine-learning/src/main/scala/com/atguigu/userprofile/ml/pipline/MyPipeLine.scala +++ b/Big_data_example/user_profile/machine-learning/src/main/scala/com/atguigu/userprofile/ml/pipline/MyPipeLine.scala @@ -230,9 +230,12 @@ class MyPipeLine { //把生成的模型存储到指定的位置 ->hdfs def saveModel(path:String) :Unit ={ pipelineModel.write.overwrite().save(path) + } - - + //把已经存储到hdfs模型加载到对象中 + def loadModel(path:String) :MyPipeLine={ + pipelineModel = PipelineModel.load(path) + this }