按数据集分组并在spark scala中运行自定义函数

bsxbgnwa  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(194)

我使用的是spark scala,有一个数据集,我想对它进行分组,然后将groupeddata发送给一个自定义函数。在自定义函数中,我将处理行并更新空的Dataframe。
我有下面的Dataframe DF1 :

+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
| ACC_SECURITY|ACCOUNT_NO|COSTCENTER|    BU|   MPU|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|POS_NEG_QUANTITY|PROCESSED|ALLOC_QUANTITY|NET_QUANTITY|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2|   18063|               P|         |             0|           0|
|3FA34782290X2|  3FA34782|    0800TS|BOXXBU|BOXXMP|    0102|     5322|      290X2|    -863|               N|         |             0|           0|
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2| -108926|               N|         |             0|           0|
|9211530135G71|  92115301|    08036C|BOXXBU|BOXXMP|    0154|     8380|      35G71|    8003|               P|         |             0|           0|
|9211530235G71|  92115302|    08036C|BOXXBU|BOXXMP|    0144|     8382|      35G71|   -2883|               N|         |             0|           0|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+

分组后打开 SECURITY_ID ,我得到了两个数据集 SECURITY_ID 值(290x2和35g71)。这些数据集必须发送到自定义函数。
我试过:
groupby网站 SECURITY_ID 但需要做一些汇总,我没有:

DF1.groupBy("SECURITY_ID").agg(max("SECURITY_ID")).apply(F)

我不想聚合,但我仍然可以删除聚合列,只要我可以传递一个函数 Fapply 块,在分组的数据集上。但是 apply 不接受任何自定义函数。
窗口功能打开 SECURITY_ID 但我不知道如何根据每个窗口执行自定义函数:

val window = Window.partitionBy("security_id") 
val option2DF = DF1.withColumn("Quantity_Row", F over(window))

我想看看如何调用函数 F 而不是通过添加列。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题