Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package com.rison.tag.models.rule

import com.rison.tag.meta.{HBaseMata, HBaseMeta}
import com.rison.tag.tools.HBaseTools
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.{SPARK_BRANCH, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -86,6 +88,12 @@ object GenderModel extends Logging {
.head()
.getAs[String]("rule")

//inType=hbase\n
// zkHosts=bigdata�cdh01.itcast.cn\n
// zkPort=2181\n
// hbaseTable=tbl_tag_users\n
// family=detail\n
// selectFieldNames=id,gender
logInfo(s"=== 业务标签规则 : {$tagRule}")

//解析标签规则,先按照换行 \n 符号分割,再按等号分割
Expand Down Expand Up @@ -123,7 +131,7 @@ object GenderModel extends Logging {
hbaseMeta.selectFieldNames.split(",").toSeq
)
} else {
new RuntimeException("业务标签未提供数据源信息,获取不到业务数据,无法计算标签")
throw new RuntimeException("业务标签未提供数据源信息,获取不到业务数据,无法计算标签")
}
businessDF.printSchema()
businessDF.show(20, false)
Expand All @@ -135,14 +143,43 @@ object GenderModel extends Logging {
.filter($"level" === 5)
.select($"rule", $"name")
//DataFrame关联,依据属性标签规则rule与业务数据字段gender
val modeDF: DataFrame = businessDF.join(
attrTagRuleDF, businessDF("gender") === attrTagRuleDF("rule")
val newMode: DataFrame = businessDF.join(
attrTagRuleDF,
businessDF("gender") === attrTagRuleDF("rule"),
"left" //使用左连接,避免用户性别字段值为空时,inner join 会舍弃该用户
)
.select(
$"id".as("userId"),
$"name".as("gender")
)
basicTagDF.unpersist()
//todo 合并标签数据
//读取旧标签数据
val oldMode: DataFrame = HBaseTools.read(
spark,
"bigdata-cd01.itcast.cn",
"2181",
"tbl_profile",
"user",
newMode.columns.toSeq //读取当前标签列
)
//自定义udf合并标签
val merge: UserDefinedFunction = udf(
(newTag: String, oldTag: String) => {
//没有新标签
if (StringUtils.isBlank(newTag)) {
oldTag
}
//有新标签,更新
else newTag
}
)

val modeDF: DataFrame = newMode.join(
oldMode,
newMode.col("userId") === oldMode.col("userId"),
"left"
).select(newMode.col("userId"), merge(newMode.col("gender"), oldMode.col("gender")).as("gender"))
//TODO 5 将标签数据存储到HBase表中: 用户画像标签表 -> tbl_profile
HBaseTools.write(modeDF, "bigdata-cd01.itcast.cn", "2181", "tbl_profile", "user", "userId")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,26 @@ object HBaseTools {
//2 设置读写HBase表的名称
config.set(TableOutputFormat.OUTPUT_TABLE, table)
//3 数据转换
//df示例 select(
// $"id".as("userId"),
// $"name".as("gender")
// )
val columns: Array[String] = dataframe.columns
val putsRDD: RDD[(ImmutableBytesWritable, Put)] = dataframe.rdd
.map {
row =>
//获取rowKey
val rowKey: String = row.getAs[String](rowKeyColumn)
val rowKey: String = row.getAs[String](rowKeyColumn) //将指定列转换为指定数据类型 rowKeyColumn=》userId
//构建Put对象
val put = new Put(Bytes.toBytes(rowKey))
//将每列数据加入到Put对象中
val familyBytes: Array[Byte] = Bytes.toBytes(family)
columns.foreach {
column =>
// put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes("列名"), Bytes.toBytes("值"));
put.addColumn(
familyBytes,
Bytes.toBytes(column),
Bytes.toBytes(column), //为每个属性创建一列
Bytes.toBytes(row.getAs[String](column))
)
}
Expand Down