kafka消费群体与spark结构化流媒体划分

toe95027  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(282)

我有一个带有3个分区的kafka主题,我正在使用spark结构化流媒体来消耗这些数据。我有3个消费者(假设消费者组a)从单个分区中读取每个分区的数据,直到这里所有的数据都是工作文件。
我有一个从同一主题读取的新需求,我想通过创建3个消费者(比如消费者组b)来并行化它,再次从单个分区读取每个消费者。因为我使用的是结构化流媒体,所以我不能说 group.id 明确地。
指向单个/同一分区的不同组的使用者是否会读取所有数据?

m4pnthwp

m4pnthwp1#

除非使用spark 3.x或更高版本,否则无法设置 group.id 在你的Kafka输入流中。如前所述,使用spark 3.x,您可以拥有两个不同的结构化流式处理作业,提供两个不同的group.id,以确保每个作业独立于其他作业读取主题的所有消息。
对于spark版本<=2.4.x,spark本身将为您创建一个独特的消费群体,您可以在github上的代码中查找:

// Each running query should use its own group id. Otherwise, the query may be only 
// assigned partial data since Kafka will assign partitions to multiple consumers having
// the same group id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

因此,同样在这种情况下,拥有两个不同的流作业将确保您拥有两个不同的consumergroup,这允许两个作业独立于另一个作业读取来自主题的所有消息。

ie3xauqp

ie3xauqp2#

来自spark 3.0.1文档:
默认情况下,每个查询为读取数据生成一个唯一的组id。这样可以确保每个kafka源都有自己的使用者组,该使用者组不会受到任何其他使用者的干扰,因此可以读取其订阅主题的所有分区。
所以,如果你使用 assign 选项并提及要使用哪个分区将从特定分区读取所有数据,因为默认情况下,它将是一个不同的使用者组(group.id)。 assign 选项将json字符串作为一个值,并且可以有来自不同主题的多个分区。例如。, {"topicA":[0,1],"topicB":[2,4]} .

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("assign", "{"topic-name":[0]}")
  .load()
j5fpnvbx

j5fpnvbx3#

use可以使用group.id进行流式处理,如下所示
string processinggroup=“处理组A”;

Dataset<Row> raw_df = sparkSession
                      .readStream()
                      .format("kafka")
                      .option("kafka.bootstrap.servers", consumerAppProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
                      .option("subscribe", topicName) 
                      .option("startingOffsets", "latest")
                      .option("group.id",  processingGroup)
                      .load();

相关问题