如何使用状态作为缓存

polkgigr  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(311)

我想读一读这个州的历史。如果state为null,则读取hbase并更新状态,并使用ontimer设置状态ttl。问题是如何批量读取hbase,因为从hbase读取单个记录效率不高。

whlutmcx

whlutmcx1#

一般来说,如果要在flink中缓存/镜像外部数据库的状态,最有效的方法是将数据库的突变流化为flink——换句话说,如果数据库支持,将flink变成数据库的更改数据捕获(cdc)流的复制端点。
我没有使用hbase的经验,但是https://github.com/mravi/hbase-connect-kafka 是一个可能有用的例子(把Kafka放在hbase和flink之间)。
如果您希望从flink查询hbase,并且希望避免一次对一个用户进行点查询,那么您可以构建如下内容:

-> queryManyUsers -> keyBy(uId) -> 
streamToEnrich                                 CoProcessFunction
              -> keyBy(uID) ------------------->

在这里,您可以分割流,通过窗口或进程函数或异步i/o发送一个副本以批量查询hbase,并将结果发送到保存缓存并执行扩展的协进程函数。
当记录沿着底部路径直接到达这个协处理函数时,如果必要的数据在缓存中,那么就使用它。否则,记录将被缓冲,等待来自上层路径的缓存数据到达。

相关问题