我有一个操作列表的函数,但它会导致性能问题,我想让它完全基于数据集。显然,数据集和列表有相同的方法,所以应该可以轻松地翻译这个方法,但不知何故,我无法获得编译解决方案。
def findDuplicates(products: Dataset[AggregateWithSupplierProducts]): Dataset[DuplicatesByIds] = {
val productsDatasetAsList = products.as[AggregateWithSupplierProducts].collect().toList
val duplicates = productsDatasetAsList.groupBy(_.products.sortBy(_.product.productId)).filter(_._2.size > 1).values.toList
mapToDataset(duplicates)
}
从一开始,从groupBy RelationalGroupedDataset返回的值就使我无法方便地翻译部件的其余部分。
数据结构:
case class AggregateWithSupplierProducts(
id: String,
products: List[Product])
case class Product(productId: String, productCount: Int)
case class DuplicatesByIds(duplicates: List[String)
数据示例:
[ {
"id": "ID1",
"products": [
{
"product": {
"productId": "SOME_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID2",
"products": [
{
"product": {
"productId": "SOME_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID3",
"products": [
{
"product": {
"productId": "DIFFERENT_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID4",
"products": [
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
},
{
"product": {
"productId": "DIFFERENT_ID",
"productCount": 1
}
}
],
},
{
"id": "ID5",
"products": [
{
"product": {
"productId": "NOT_DUPLICATED_ID",
"productCount": 1
}
},
{
"product":
"productId": "DIFFERENT_ID",
"productCount": 2
}
}
],
}
]
这样做的结果是:
Dataset with
DuplicatesByIds(List("ID1", "ID2")),
DuplicatesByIds(List("ID3", "ID4"))
代码可以正常处理收集到列表中的数据集,但在将其转换为完全处理数据集而不浪费内存时遇到了相当大的问题
1条答案
按热度按时间wwodge7n1#
您可以按如下方式对重复项进行分组和过滤。注意,我必须添加另一个case类
P
,否则我无法解析json示例: