使用apacheignite共享sparkrdd

q7solyqu  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(288)

我实施的是:

val sparkConf = new SparkConf().setAppName(“SharedRDD”).setMaster(“local”)
 val sc = SparkContext.getOrCreate(sparkConf)
 val sparkRDD = sc.wholeTextFiles("sample.csv", 10)

此rdd由上下文缓存

val igniteContext = new IgniteContext(sc, “example-shared-rdd.xml”, false)

val sharedIgniteRDD = igniteContext.fromCache[String, String](“cachedIgniteRDD”)
if (sharedIgniteRDD.isEmpty())
sharedIgniteRDD.savePairs(sparkRDD)

现在,如果任何spark作业需要访问这个rdd,它不需要创建新的rdd,而是从ignite缓存中检索它。

val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)

输入文件的示例数据

25/07/13,11599,CLOSED
25/07/13,256,PENDING_PAYMENT
25/07/13,12111,COMPLETE
25/07/13,8827,CLOSED
25/07/13,11318,COMPLETE
25/07/13,7130,COMPLETE
25/07/13,4530,COMPLETE
25/07/13,2911,PROCESSING
25/07/13,5657,PENDING_PAYMENT
25/07/13,5648,PENDING_PAYMENT

我需要调用groupbykey()api对输入文件的status列进行分组。
谢谢你的帮助。
谢谢

d7v8vwbk

d7v8vwbk1#

wholeTextFiles 将整个文件放在一个条目中,所以将其保存在ignite中没有多大意义。基本上,您将拥有一个包含一个大条目的分布式缓存。这没有任何好处。
您应该首先分割文件,并将每一行保存为一个单独的元组。然后您可以将这些数据保存在ignite中,并使用RDDAPI对其进行处理。

ej83mcc0

ej83mcc02#

使用 cache 或者 persist 应该避免 rdd 重新创造。你可以选择保存
rdds in-memory , memory-disk , serialized , deserialized 等。 IgniteContext 是上述解决方案的替代方案。
这对我来说有点棘手 groupByKey 在你保存的 rddIgniteContext ,如您所用 wholeTextFile 读取文本文件。 WholeTextFile 将生成 Tuple2 rdd of(“文本文件的路径”,“文本行”)
一旦你读到保存的
rdd fromCacheIgniteContext ```
val RDDfromCache = igniteContext.fromCacheString, String

您可以为 `groupByKey` ```
val groupedRDD = RDDfromCache.flatMap(x => x._2.split("\n")).map(array => (array.split(",")(2), array)).groupByKey().foreach(println)

输出将是

(CLOSED,CompactBuffer(25/07/13,11599,CLOSED, 25/07/13,8827,CLOSED))
(PENDING_PAYMENT,CompactBuffer(25/07/13,256,PENDING_PAYMENT, 25/07/13,5657,PENDING_PAYMENT, 25/07/13,5648,PENDING_PAYMENT))
............
..........
..........

相关问题