分区的数量如何影响spark kafka连接?

wgxvkvu9  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(487)

我正在旋转一个emr来向kafka发布一个Dataframe(大约300-400行)。我可以发布它,Dataframe有200个分区。在发布Dataframe时,我看到kafka集群中的cpu出现了大约20-30分钟的巨大峰值。分区号是否创建了200个连接?
或者像这里所说的那样使用1个连接。http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#producer-缓存
样本代码

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions.col
val kafkaOptions = Map("kafka.bootstrap.servers" -> s"$host:$port",
        "kafka.security.protocol" -> "SSL",
        "kafka.ssl.endpoint.identification.algorithm" -> "",
        "kafka.ssl.truststore.location" -> "/home/hadoop/client.truststore.jks",
        "kafka.ssl.truststore.password" -> "password",
        "kafka.ssl.keystore.type" -> "PKCS12",
        "kafka.ssl.key.password" -> "password",
        "kafka.ssl.keystore.location" -> "/home/hadoop/client.keystore.p12",
        "kafka.ssl.keystore.password" -> "password")
    )

 val df = spark
        .read
        .option("header", true)
        .option("escape", "\"")
        .csv("s3://bucket/file.csv")

 val publishToKafkaDf = df.withColumn("value", col("body"))

 publishToKafkaDf
      .selectExpr( "CAST(value AS STRING)")
      .write
      .format("kafka")
      .option("topic", "test-topic")
      .options(kafkaOptions)
      .save()
r1wp621o

r1wp621o1#

我可以发布它,Dataframe有200个分区。在发布Dataframe时,我看到kafka集群中的cpu出现了大约20-30分钟的巨大峰值。分区号是否创建了200个连接?
根据所述:
spark初始化一个kafka生产者示例,并在同一个缓存密钥的多个任务之间协同使用。
这告诉我,将有一个Kafka生产者在一个单一的执行器上的所有任务共享(不过,我还没有查到消息来源,所以我不太确定。)
换句话说,分区的数量(在执行时是任务)在可用的执行器之间共享。如果你有10个遗嘱执行人,我的理解是会有10个Kafka制作人。
请注意,本文档适用于最新的spark 3.0.0,而您使用spark 2.3.0的依据是:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0

我认为这并不重要,因为这一Kafka生产者每执行已在早期版本中使用。在3.0中,他们可能已经改进了共享和缓存。

相关问题