Spark RDD持久化

x33g5p2x  于2021-03-14 发布在 Spark  
字(1.2k)|赞(0)|评价(0)|浏览(421)

进行持久化可以采用缓存机制或者检查点。

1. 缓存机制

  • RDD缓存机制:缓存数据至内存/磁盘,可大幅度提升Spark应用性能。有如下两个缓存方法:
 // 1. cache缓存就是persist(StorageLevel.MEMORY_ONLY)
 def cache(): this.type = persist()
 def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

 // 2. persist(newLevel: StorageLevel)可以选择缓存级别
 def persist(newLevel: StorageLevel)
  • 示例
val u1 = sc.textFile("file:///root/data/users.txt").cache
u1.collect // 删除users.txt,再试试,数据应该还是在的
u1.unpersist() // 清空缓存
  • StorageLevel缓存级别有:
级别使用空间CPU时间是否在内存中是否在磁盘上备注
MEMORY_ONLY否  
MEMORY_ONLY_2数据存2份
MEMORY_ONLY_SER数据序列化
MEMORY_ONLY_SER_2数据序列化,数据存2份
MEMORY_AND_DISK中等部分部分如果数据在内存中放不下,则溢写到磁盘
MEMORY_AND_DISK_2中等部分部分数据存2份
MEMORY_AND_DISK_SER部分部分 
MEMORY_AND_DISK_SER_2部分部分数据存2份
DISK_ONLY 
DISK_ONLY_2数据存2份
NONE       
OFF_HEAP    
  • 缓存应用场景
    • 从文件加载数据之后,因为重新获取文件成本较高。
    • 经过较多的算子变换之后,重新计算成本较高。
    • 单个非常消耗资源的算子之后。
  • 使用注意事项
    • cache()或persist()。
    • cache()或persist()<ins>遇到Action算子完成后才生效</ins>。

<br/>

2. 检查点

检查点:类似于快照。

sc.setCheckpointDir("hdfs:/checkpoint0918")
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.checkpoint // 使用检查点
rdd.collect // 调用Action算子后生成快照
rdd.isCheckpointed // rdd是否启用了检查点,是返回true
rdd.getCheckpointFile // 获取此RDD检查点所在的目录的名称

<br/>

3. 检查点与缓存的区别

  • 检查点会删除RDD lineage,而缓存不会。
  • SparkContext被销毁后,检查点数据不会被删除,而缓存的数据则会被删除。

相关文章

微信公众号

最新文章

更多