与自定义spark结构化流媒体接收器没有并行性

i2loujxw  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(290)

我正在编写一个定制的spark结构化流媒体接收器,将kafka读取的事件写入googlebq(大查询)。下面是我写的代码。
代码正在编译并成功运行。但是我的接收器总是只在一个执行器中运行(总是在驱动程序运行的地方)。我不明白这里的问题。
下面是我定制的大查询接收器的实现。

package bq

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode

class DefaultSource extends StreamSinkProvider with DataSourceRegister{

  override def createSink(
                           sqlContext: SQLContext,
                           parameters: Map[String, String],
                           partitionColumns: Seq[String],
                           outputMode: OutputMode): Sink = {
    new BQSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "bq"
}
class BQSink(sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode) extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {

    val df = data.sparkSession.createDataFrame
                       (data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)

    df.collect().foreach({ row => {
      //code that writes the rows to Big Query.
    }  
  }

这是我的驱动程序

// Reading raw events from Kafka
    val inputDF = sparkSession.readStream
                              .format("kafka")
                              .option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
                              .option("subscribe", "topic")
                              .option("fetchOffset.numRetries", 5)
                              .option("failOnDataLoss", "false")
                              .option("startingOffsets", "latest")
                              .load()
                              .selectExpr("value")
                              .as[Array[Byte]];

    // Transforming inputDF to OutputDF
    val outputDF = inputDF.map(event => transform(event))

    // Writing outputDF events to BQ
    val query = outputDF.writeStream
                        .format("bq")
                        .option("checkpointLocation",config.getString("checkpointLocation"))
                        .outputMode(OutputMode.Append())
                        .start()

    //Start Streaming
    query.awaitTermination()

即使我的主题有多个分区,我的自定义接收器也只在一个执行器中运行

uoifb46i

uoifb46i1#

使用 df.collect 将从执行者那里收集所有数据给你的司机。因此,您只看到驱动程序将数据发送到接收器。
你需要做什么 df.foreachPartition 并使用一个bq生产商,可在您的执行人。您可能会遇到“task not serializable”问题,但您可以在这里查看一下,了解如何在scala spark中解决这个问题。

相关问题