jvm Scala -将函数从基于列表转换为基于数据集

k4aesqcs  于 2022-11-07  发布在  Scala
关注(0)|答案(1)|浏览(114)

我有一个操作列表的函数,但它会导致性能问题,我想让它完全基于数据集。显然,数据集和列表有相同的方法,所以应该可以轻松地翻译这个方法,但不知何故,我无法获得编译解决方案。

def findDuplicates(products: Dataset[AggregateWithSupplierProducts]): Dataset[DuplicatesByIds] = {
    val productsDatasetAsList = products.as[AggregateWithSupplierProducts].collect().toList
    val duplicates = productsDatasetAsList.groupBy(_.products.sortBy(_.product.productId)).filter(_._2.size > 1).values.toList
    mapToDataset(duplicates)
  }

从一开始,从groupBy RelationalGroupedDataset返回的值就使我无法方便地翻译部件的其余部分。
数据结构:

case class AggregateWithSupplierProducts(
                                         id: String, 
                                         products: List[Product])
case class Product(productId: String, productCount: Int)
case class DuplicatesByIds(duplicates: List[String)

数据示例:

[  {
   "id": "ID1",
   "products": [
     {
       "product": {
         "productId": "SOME_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID2",
   "products": [
     {
       "product": {
         "productId": "SOME_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID3",
   "products": [
     {
       "product": {
         "productId": "DIFFERENT_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID4",
   "products": [
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "DIFFERENT_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID5",
   "products": [
     {
       "product": {
         "productId": "NOT_DUPLICATED_ID",
         "productCount": 1
       }
     },
     {
       "product":
         "productId": "DIFFERENT_ID",
         "productCount": 2
       }
     }
   ],
 }
]

这样做的结果是:

Dataset with 
DuplicatesByIds(List("ID1", "ID2")),
DuplicatesByIds(List("ID3", "ID4"))

代码可以正常处理收集到列表中的数据集,但在将其转换为完全处理数据集而不浪费内存时遇到了相当大的问题

wwodge7n

wwodge7n1#

您可以按如下方式对重复项进行分组和过滤。注意,我必须添加另一个case类P,否则我无法解析json示例:

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

case class Product(productId: String, productCount: BigInt)
case class P(product: Product)
case class AggregateWithSupplierProducts(
                                         id: String, 
                                         products: List[P])
case class DuplicatesByIds(duplicates: List[String])

def findDuplicates(products: Dataset[AggregateWithSupplierProducts]) = 
    (
        products
        .withColumn("productList", sort_array(col("products.product.productId")))
        .groupBy("productList")
        .agg(collect_list("id").alias("duplicates"))
        .filter(size(col("duplicates"))>1)
        .select("duplicates")
        .as[DuplicatesByIds]
    )

val ds = spark.read.option("multiline", "true").json("test.json").as[AggregateWithSupplierProducts]
ds.show(false)
+---+-----------------------------------------------+
|id |products                                       |
+---+-----------------------------------------------+
|ID1|[{{1, SOME_ID}}, {{1, SOME_OTHER_ID}}]         |
|ID2|[{{1, SOME_ID}}, {{1, SOME_OTHER_ID}}]         |
|ID3|[{{1, DIFFERENT_ID}}, {{1, SOME_OTHER_ID}}]    |
|ID4|[{{1, SOME_OTHER_ID}}, {{1, DIFFERENT_ID}}]    |
|ID5|[{{1, NOT_DUPLICATED_ID}}, {{2, DIFFERENT_ID}}]|
+---+-----------------------------------------------+

findDuplicates(ds).show
+----------+
|duplicates|
+----------+
|[ID3, ID4]|
|[ID1, ID2]|
+----------+

相关问题