用spark变换构造Hive衍生柱

o0lyfsai  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(302)

我有一张table记录如下。

Id   Indicator     Date
1       R       2018-01-20
1       R       2018-10-21
1       P       2019-01-22
2       R       2018-02-28
2       P       2018-05-22
2       P       2019-03-05

我要选一个 Id 有两个以上 R 并派生出一个新的列称为 Marked_Flag 作为 Y 否则 N . 所以预期的输出应该如下所示,

Id  Marked_Flag 
1   Y
2   N

所以到目前为止,我把记录放在一个数据集中,然后再从中构建另一个数据集。代码如下所示。

Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");

Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");

但是我的领导使用单个数据集和spark转换来完成这项工作。我是一个新的Spark,任何关于这方面的指导或代码片段将是非常有帮助的。
创建了两个数据集,一个用于获取聚合,另一个使用聚合值派生新列。

Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");

Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");

输入
预期产量

i7uaboj4

i7uaboj41#

尝试以下方法。注意,我在这里使用的是pysparkDataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
[1, "R", "2018-01-20"],
[1, "R", "2018-10-21"],
[1, "P", "2019-01-22"],
[2, "R", "2018-02-28"],
[2, "P", "2018-05-22"],
[2, "P", "2019-03-05"]], ["Id", "Indicator","Date"])

gr = df.filter(F.col("Indicator")=="R").groupBy("Id").agg(F.count("Indicator"))
gr = gr.withColumn("Marked_Flag", F.when(F.col("count(Indicator)") > 1, "Y").otherwise('N')).drop("count(Indicator)")
gr.show()

# +---+-----------+

# | Id|Marked_Flag|

# +---+-----------+

# |  1|          Y|

# |  2|          N|

# +---+-----------+

# 

相关问题