进行持久化可以采用缓存机制或者检查点。
// 1. cache缓存就是persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
// 2. persist(newLevel: StorageLevel)可以选择缓存级别
def persist(newLevel: StorageLevel)
val u1 = sc.textFile("file:///root/data/users.txt").cache
u1.collect // 删除users.txt,再试试,数据应该还是在的
u1.unpersist() // 清空缓存
级别 | 使用空间 | CPU时间 | 是否在内存中 | 是否在磁盘上 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_2 | 高 | 低 | 是 | 否 | 数据存2份 |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 数据序列化 |
MEMORY_ONLY_SER_2 | 低 | 高 | 是 | 否 | 数据序列化,数据存2份 |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存中放不下,则溢写到磁盘 |
MEMORY_AND_DISK_2 | 高 | 中等 | 部分 | 部分 | 数据存2份 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | |
MEMORY_AND_DISK_SER_2 | 低 | 高 | 部分 | 部分 | 数据存2份 |
DISK_ONLY | 低 | 高 | 否 | 是 | |
DISK_ONLY_2 | 低 | 高 | 否 | 是 | 数据存2份 |
NONE | |||||
OFF_HEAP |
<br/>
检查点:类似于快照。
sc.setCheckpointDir("hdfs:/checkpoint0918")
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.checkpoint // 使用检查点
rdd.collect // 调用Action算子后生成快照
rdd.isCheckpointed // rdd是否启用了检查点,是返回true
rdd.getCheckpointFile // 获取此RDD检查点所在的目录的名称
<br/>
内容来源于网络,如有侵权,请联系作者删除!