使用spark并行缓存和查询数据集

b5buobof  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(373)

我有一个要求,我想缓存一个数据集,然后通过在该数据集上并行启动“n”个查询来计算一些度量,所有这些查询都计算类似的度量,只是过滤器会改变,我想并行运行这些查询,因为响应时间很关键,我想缓存的数据集大小将始终小于gb。
我知道如何在spark中缓存一个数据集,然后随后查询它,但是如果我必须在同一个数据集上并行运行查询,那么如何实现相同的查询呢?引入alluxio是一种方法,但在spark world中,还有其他方法可以达到同样的效果吗?
例如,使用java,我可以在内存中缓存数据,然后通过使用多线程我可以实现同样的效果,但是如何在spark中实现呢?

3lxsmp7m

3lxsmp7m1#

使用scala的并行集合在spark的驱动程序代码中触发并行查询非常简单。下面是一个最简单的例子:

val dfSrc = Seq(("Raphael",34)).toDF("name","age").cache()

// define your queries, instead of returning a dataframe you could also write to a table etc
val query1: (DataFrame) => DataFrame = (df:DataFrame) => df.select("name")
val query2: (DataFrame) => DataFrame = (df:DataFrame) => df.select("age")

// Fire queries in parallel
import scala.collection.parallel.ParSeq
ParSeq(query1,query2).foreach(query => query(dfSrc).show())

编辑:
要收集查询id并生成Map,您应该这样做:

val resultMap  = ParSeq(
 (1,query1), 
 (2,query2)
).map{case (queryId,query) => (queryId,query(dfSrc))}.toMap

相关问题