【flink】RocksDB介绍以及Flink对RocksDB的支持

x33g5p2x  于2022-01-04 转载在 Flink  
字(4.6k)|赞(0)|评价(0)|浏览(449)

1.概述

转载:「Flink」RocksDB介绍以及Flink对RocksDB的支持

2.RocksDB简介

RocksDB是基于C++语言编写的嵌入式KV存储引擎,它不是一个分布式的DB,而是一个高效、高性能、单点的数据库引擎。它是由Facebook基于Google开源的kv存储LevelDB开发开发。RocksDB使用LSM存储引擎。它针对不同的生产环境进行调优,可以直接使用内存、也可以使用Flash、或者用硬盘或者HDFS。而且支持不同的压缩算法,有一整套的工具用于生产、调试使用。RocksDB是一种嵌入式、KV型、持久化的存储。

使用嵌入式的数据存储原因有很多,当数据频繁访问内存、或者存储时,网络延迟会增加响应时间。

3.RocksDB的主要应用场景

  • 适应于多CPU场景

  • 一般的商业服务器有很多的CPU核,例如:志强E5系列 - 6核

  • RocksDB可以高效运行在多核服务器上

  • 它提供的RocksDB语义比传统DBMS更简单

  • 高效利用存储

  • RocksDB可以在快速存储上高效运行且不会成为性能瓶颈

  • RocksDB采用LSM引擎,对比B-Tree引擎,它有更好的压缩和更小的写放大

  • 弹性架构,支持扩展

  • 支持IO-bound、in-memory、write-once

4.入门案例

为了简单说明RocksDB,我们这里使用RocksDB的Java版本来编写。

导入Maven依赖

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
        <dependency>
            <groupId>org.rocksdb</groupId>
            <artifactId>rocksdbjni</artifactId>
            <version>5.11.3</version>
        </dependency>
    </dependencies>

基于RocksDB读写数据

public class GettingStartDemo {
    // 因为RocksDB是由C++编写的,在Java中使用首先需要加载Native库
    static {
        // Loads the necessary library files.
        // Calling this method twice will have no effect.
        // By default the method extracts the shared library for loading at
        // java.io.tmpdir, however, you can override this temporary location by
        // setting the environment variable ROCKSDB_SHAREDLIB_DIR.
        // 默认这个方法会加压一个共享库到java.io.tmpdir
        RocksDB.loadLibrary();
    }

    public static void main(String[] args) throws RocksDBException {
        // 1. 打开数据库
        // 1.1 创建数据库配置
        Options dbOpt = new Options();
        // 1.2 配置当数据库不存在时自动创建
        dbOpt.setCreateIfMissing(true);
        // 1.3 打开数据库。因为RocksDB默认是保存在本地磁盘,所以需要指定位置
        RocksDB rdb = RocksDB.open(dbOpt, "./data/rocksdb");

        // 2. 写入数据
        // 2.1 RocksDB都是以字节流的方式写入数据库中,所以我们需要将字符串转换为字节流再写入。这点类似于HBase
        byte[] key = "zhangsan".getBytes();
        byte[] value = "20".getBytes();

        // 2.2 调用put方法写入数据
        rdb.put(key, value);
        System.out.println("写入数据到RocksDB完成!");

        // 3. 调用delete方法读取数据
        System.out.println("从RocksDB读取key = " + new String(key) + "的value为" + new String(rdb.get(key)));

        // 4. 移除数据
        rdb.delete(key);

        // 关闭资源
        rdb.close();
        dbOpt.close();
    }
}

运行程序后,我们可以发现,在data/rocksdb文件夹中,生成了一下几个文件:

- 0000004.sst
	sst是RocksDB的数据存储文件,是二进制格式的
0000006.log
	log是预写日志文件,LSM架构引擎都是有预写日志的
CURRENT
	CURRENT文件是一个文本文件,记录最近的MANIFEST
IDENTITY
	存放当前rocksdb的唯一标识
LOCK
	LOCK 进程的全局锁,DB一旦被open, 其他进程将无法修改
LOG
	rocksdb的操作日志文件, 可配置定期的统计信息写入LOG. 可通过info_log_level调整日志输出级别; 通过keep_log_file_num限制文件数量 等等。
LOG.old.15807….
MANIFECT-000005
	记录rocksdb最近的状态变化日志。其中包含manifest日志 和最新的文件指针
OPTIONS-000005
	rocksdb的配置文件
OPTIONS-000008

5.Flink使用RocksDBBackend

导入Maven依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
</dependency>

2、配置启用RocksDBBackend

// 此处也可以是HDFS路径,这里为了测试方便,所以使用的是本地路径
env.setStateBackend(new RocksDBStateBackend("file:///D:/project/java8/data/rocksdb", true));

6.Flink基于RocksDB的增量检查点机制

为什么只有RocksDB状态后端支持增量检查点呢?这是由RocksDB本身的特性决定的。RocksDB是一个基于日志结构合并树(LSM树)的键值式存储引擎,我们能明确地感觉到,它与HBase肯定有诸多相似之处。如果看官不了解LSM树的话,可以通过这篇文章来做个简单的了解。

在RocksDB中,扮演HBase MemStore角色的写缓存叫做memtable。memtable写满之后也会flush到磁盘,形成与HFile类似的东西,叫做sstable(是“有序序列表”即sorted sequence table的缩写)。RocksDB也存在compaction线程,在后台合并已经写入的sstable,原有的sstable会包含所有的键值对,合并前的sstable在此后会被删除。

由于Flink检查点生成的时间必须确定,因此不能等待RocksDB的memtable自动flush到磁盘,而是由Flink主动调用RocksDB提供的API强制刷写。有了上面的铺垫,下面通过例子来解释增量检查点的过程。

From https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html

上图示出一个有状态的算子的4个检查点,其ID为2,并且state.checkpoints.num-retained参数设为2,表示保留2个检查点。表格中的4列分别表示RocksDB中的sstable文件,sstable文件与存储系统中文件路径的映射,sstable文件的引用计数,以及保留的检查点的范围。

下面按部就班地解释一下:

  • 检查点CP 1完成后,产生了两个sstable文件,即sstable-(1)与sstable-(2)。这两个文件会写到持久化存储(如HDFS),并将它们的引用计数记为1。
  • 检查点CP 2完成后,新增了两个sstable文件,即sstable-(3)与sstable-(4),这两个文件的引用计数记为1。并且由于我们要保留2个检查点,所以上一步CP 1产生的两个文件也要算在CP 2内,故sstable-(1)与sstable-(2)的引用计数会加1,变成2
  • 检查点CP 3完成后,RocksDB的compaction线程将sstable-(1)、sstable-(2)、sstable-(3)三个文件合并成了一个文件sstable-(1,2,3)。CP 2产生的sstable-(4)得以保留,引用计数变为2,并且又产生了新的sstable-(5)文件。注意此时CP 1已经过期,所以sstable-(1)、sstable-(2)两个文件不会再被引用,引用计数减1
  • 检查点CP 4完成后,RocksDB的compaction线程将sstable-(4)、sstable-(5)以及新生成的sstable-(6)三个文件合并成了sstable-(4,5,6),并对sstable-(1,2,3)、sstable-(4,5,6)引用加1。由于CP 2也过期了,所以sstable-([1~4])四个文件的引用计数同时减1,这就造成sstable-(1)、sstable-(2)、sstable-(3)的引用计数变为0,Flink就从存储系统中删除掉这三个文件。

通过上面的分析,我们可以看出Flink增量检查点机制的巧妙之处:

  • 通过跟踪sstable的新增和删除,可以记录状态数据的变化;
  • 通过引用计数的方式,上一个检查点中已经存在的文件可以直接被引用,不被引用的文件可以及时删除;
  • 可以保证当前有效的检查点都不引用已经删除的文件,从而保留state.checkpoints.num-retained参数指定的状态历史。

增量检查点解决了大状态checkpointing的问题,但是在从检查点恢复现场时会带来潜在的overhead。这是显然的:当程序出问题后,TaskManager需要从多个检查点中加载状态数据,并且这些数据中还可能会包含将被删除的状态还有一点,就算磁盘空间紧张,旧检查点的文件也不能随便删除,因为新检查点仍然会引用它们,如果贸然删除,程序就无法恢复现场了。可见,优秀的技术方案往往也不是十全十美,往往都是要考虑tradeoff的。

M.参考文献:

RocksDB中文网:https://rocksdb.org.cn/

https://rocksdb.org.cn/doc/RocksJava-Basics.html

https://www.jianshu.com/p/2638e2b379c3

https://www.jianshu.com/p/3302be5542c7

相关文章