ApacheSpark—如何有效地将dataframe对象解析为键值对的Map

hfsqlsce  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(327)

我正在处理一个带有列的Dataframe basketID 以及 itemID . 有没有一种方法可以有效地解析数据集并生成密钥所在的Map basketID 这个值是所有 itemID 装在每个篮子里?
我当前的实现在Dataframe上使用for循环,这不是很好的可伸缩性。有没有可能更有效地做到这一点?任何帮助都将不胜感激谢谢!
样本数据截屏
目标是获得 basket = Map("b1" -> Set("i1", "i2", "i3"), "b2" -> Set("i2", "i4"), "b3" -> Set("i3", "i5"), "b4" -> Set("i6")) . 下面是我使用for循环的实现

// create empty container
val basket = scala.collection.mutable.Map[String, Set[String]]()
// loop over all numerical indexes for baskets (b<i>)
for (i <- 1 to 4) {
  basket("b" + i.toString) = Set();
}
// loop over every row in df and store the items to the set
df.collect().foreach(row => 
  basket(row(0).toString) += row(1).toString
)
vawmfj5a

vawmfj5a1#

您只需执行aggregatebykey操作,collectiasmap就会直接给出所需的结果。它比简单的groupby有效得多。

import scala.collection.mutable
case class Items(basketID: String,itemID: String)

 import spark.implicits._
 val result = output.as[Items].rdd.map(x => (x.basketID,x.itemID))
.aggregateByKey[mutable.Buffer[String]](new mutable.ArrayBuffer[String]())
 ((l: mutable.Buffer[String], p: String) => l += p , 
 (l1: mutable.Buffer[String], l2: mutable.Buffer[String]) => (l1 ++ l2).distinct)
.collectAsMap();

你可以在这里查看其他聚合api,比如reduceby和groupby。请同时检查aggregatebykey与groupbykey与ReduceeByKey的差异。

knsnq2tg

knsnq2tg2#

假设您的数据集足够小,可以放入驱动程序的内存中,这是非常有效的。 .collect 将为您提供一个行数组,您可以对其进行迭代。如果您想要可伸缩性 Map[String, Set[String]] (这将驻留在驱动程序内存中)您可以使用 PairRDD[String, Set[String]] (这将被分发)。

//NOT TESTED

//Assuming df is dataframe with 2 columns, first is your basketId and second is itemId

df.rdd.map(row => (row.getAs[String](0), row.getAs[String](1)).groupByKey().mapValues(x => x.toSet)

相关问题