带kafka的spark结构化流-如何重新划分数据并在工作节点之间分配处理

hpcdzsge  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(206)

如果我的Kafka主题收到

CHANNEL | VIEWERS | .....
ABC     |  100    | .....
CBS     |  200    | .....

我有spark结构化流式代码来读取和处理Kafka的记录,如下所示:

val spark = SparkSession 
      .builder 
      .appName("TestPartition") 
      .master("local[*]") 
      .getOrCreate() 

    import spark.implicits._ 

    val dataFrame = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", 
      "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092") 
      .option("subscribe", "partition_test") 
      .option("failOnDataLoss", "false") 
      .load() 
      .selectExpr("CAST(value AS STRING)") 
      // I will use a custom UDF to transform to a specific object

目前,我使用foreachwriter处理记录如下:

val writer = new ForeachWriter[testRec] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }
    def process(record: testRec) = {
      handle(record)
    }
    def close(errorOrNull: Throwable): Unit = {
    }
  }

  val query = dataFrame.writeStream
    .format("console")
    .foreach(writer)
    .outputMode("append")
    .start()

代码运行得很好。但是,我想做的是将传入的数据按通道进行分区,这样每个worker负责特定的通道,并且我在handle()块中进行与该通道相关的内存计算。有可能吗?如果是,我该怎么做?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题