scala:将来自kafkavi的数据分割成数据流

rn0zuynd  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(405)

我收到Kafka以

{"email":"test@example","firstname":"Example","lastname":"User"}

我想访问电子邮件id和名字,并将其与来自cassandra的数据进行比较,格式如下:

CassandraRow{email: abc@xyz.com}
i7uaboj4

i7uaboj41#

您需要使用 joinWithCassandraTable 函数。。。
为了更有效,您可能需要重新分区您从kafka获得的rdd,以匹配cassandra表中的分区。代码可以如下所示:

val resultRdd = kafkaRDD.repartitionByCassandraReplica("ks","emails")
   .joinWithCassandraTable("ks","emails")

之后,你可以分析,如果名称匹配等,加入后,你应该得到只有记录,其中有电子邮件在Cassandra。。。

相关问题