在spark流应用程序中连接数据的最佳方法是什么?

deyfvvtc  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(351)

问题:本质上,这意味着,不是为每个流式处理记录运行一个c表连接,而是在spark流式处理中为每个记录的微批(微批处理)运行一个连接吗?
我们几乎完成使用sparksql2.4.x版本,datastax spark cassandra connector for cassandra-3.x版本。
但是在下面的场景中,关于效率有一个基本的问题。
对于流数据记录(即streamingdataset),我需要从cassandra(c
)表中查找现有记录(即cassandradataset)。

Dataset<Row> streamingDataSet = //kafka read dataset

Dataset<Row> cassandraDataset= //loaded from C* table those records loaded earlier from above.

要查找数据,我需要加入上述数据集
即。

Dataset<Row> joinDataSet = cassandraDataset.join(cassandraDataset).where(//somelogic)

进一步处理joindataset以实现业务逻辑。。。
在上面的场景中,我的理解是,对于从kafka流接收到的每条记录,它将查询c表,即数据库调用。
如果c
表包含数十亿条记录,是否需要大量的时间和网络带宽?为了改进查找c表,应该遵循什么方法/程序?
在这种情况下,最好的解决方案是什么?我无法从c
表加载一次并查找,因为数据一直添加到c*表中。。。i、 e.新的查找可能需要新的持久化数据。
如何处理这种情况?有什么建议吗。。

ghhaqwfi

ghhaqwfi1#

如果您使用的是apache cassandra,那么只有一种可能性可以有效地连接cassandra中的数据——通过rddapi joinWithCassandraTable . spark cassandra connector(scc)的开源版本只支持它,而在dse版本中,有一个代码允许对cassandra执行有效的连接,也支持spark sql,即所谓的dse直接连接。如果你用 join 在针对cassandra表的sparksql中,spark需要从cassandra读取所有数据,然后执行join—这非常慢。
我没有oss scc为spark结构化流媒体做连接的例子,但是我有一些“普通”连接的例子,比如:

CassandraJavaPairRDD<Tuple1<Integer>, Tuple2<Integer, String>> joinedRDD =
     trdd.joinWithCassandraTable("test", "jtest",
     someColumns("id", "v"), someColumns("id"),
     mapRowToTuple(Integer.class, String.class), mapTupleToRow(Integer.class));

相关问题