apache spark与apache kafka集成

izkcnapc  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(243)

我是spark的新手,我正在使用spark结构化流媒体(spark 3.0.0,scala 2.12)读取kafka主题中的数据
我有3个spark workers,主题分为3个部分
如果我错了,请纠正我,但是当我从不同的答案和spark文档中阅读时,spark使用消费者组来阅读kafka的内容,如果我有相同数量的分区,那么应该将执行者Map为1:1
但是当我检查消费者组时,我发现只有一个ip正在消费,它是驱动程序ip,所以数据不是直接从执行器中消费的,我发现任务只被划分为两个执行器,第三个几乎处于空闲状态
所以我有两个问题:
为什么只有驱动节点从kafka消费?我能让每个工人的执行者直接使用Kafka的数据吗?
我能把工作量分配给所有的执行者吗?

object DataProcessing extends Serializable with Logging {

  val topics =  ""
  val brokers = ""

//window config => I'm using this during processing
  val windowDuration = s"2 hours"
  val slideDuration = s"15 minutes"

  @transient var streamingQuery: StreamingQuery = null

  val spark =
    SparkSession.builder
      .appName("app-name")
      .getOrCreate()

  import spark.implicits._

  def main(args: Array[String]): Unit = {

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topics)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", "10000")
      .load()
    logDebug("Kafka Topic Data Schema => ")
    df.printSchema()

    val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]

//extracting columns => transformations and groupings => windowedData

    val writer = new AuroraSink(url, username, password)
    streamingQuery = windowedData.writeStream
      .queryName("data-processor")
      .foreach(writer)
      .outputMode("update")
      .option("checkpointLocation", s"/data/$checkpoint")
      .trigger(ProcessingTime("10 seconds"))
      .start()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题