我知道在spark处理中有几个阶段时,中间阶段rdd会一直存储到作业完成,但是读到cache()和persist()的使用,我觉得它们也在做同样的事情(除了可以使用的memory\和\u disk、memory\和memory\ u only\ ser选项)。有人能告诉我们为什么在使用中间rdd时显式使用cache()和persist(),您能给出两者的一些用例吗?。
...
lines = sc.textFile(sys.argv[1], 1)
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda (url, neighbors): (url, 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in xrange(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
lambda (url, (urls, rank)): computeContribs(urls, rank))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
...
1条答案
按热度按时间fsi0uk1n1#
spark不存储和
RDD
默认情况下。相反,它采用了懒惰的评估机制。简单地说,sparkRDD
沿袭,创建dag,在遇到操作之前不会执行任何转换。你可以阅读这篇文章来了解更多的细节。至于两者的区别
cache()
以及persist()
,摘自本文:缓存和持久化都用于保存spark-rdd、dataframe和dataset。但是,区别在于,rdd cache()方法默认将其保存到内存(仅限于内存),而persist()方法用于将其存储到用户定义的存储级别。
cache()
以及persist()
都是一样的,唯一不同的是cache()
存储级别仅为内存,并且persist()
可以同时具有内存和磁盘存储级别。所以呢cache()
和打电话一样persist()
使用默认存储级别。在哪里使用
cache()
在哪里使用persist()
,这取决于场景。本文将帮助您在何处使用缓存,只需在以下情况下缓存:迭代机器学习应用中的rdd重用
独立spark应用程序中的rdd重用
当rdd计算非常昂贵时,缓存可以帮助减少一个或多个执行器失败时的恢复成本
作为示例,您可以查看pagerank示例。
在本例中,链接在
for
循环,这就是缓存它以避免重新计算的原因。我希望这个答案能帮助你。