混合spark结构化流式api和dstream写入kafka

zte4gxcn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(350)

我最近注意到我对spark流有一个困惑(我目前正在学习spark)。
我从Kafka的一个主题中读到如下数据:

val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

从而产生一个数据流。
为了处理事件时间(而不是处理时间),我做了以下工作:

outputStream
      .foreachRDD(rdd => {
          rdd.toDF().withWatermark("timestamp", "60 seconds")
            .groupBy(
              window($"timestamp", "60 seconds", "10 seconds")
            )
            .sum("meterIncrement")
            .toJSON
            .toDF("value")
            .writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("topic", "taxi-dollar-accurate")
            .start()
        )
      })

我得到了错误
“writestream”只能在流式数据集/Dataframe上调用
这让我很惊讶,因为df的来源是一个dstream。无论如何,我通过将.writestream改为.write并将.start()改为.save()来解决这个问题。
但我觉得我不知怎么失去了那台foreach的流式电源。显然这就是我写这个问题的原因。这是正确的方法吗?我见过其他使用

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

但我不知道这与在数据流上调用foreach,然后将每个rdd转换为df有什么不同。

eagi6jfj

eagi6jfj1#

但我不知道这与在数据流上调用foreach,然后将每个rdd转换为df有什么不同。
当您打电话时:

outputStream
      .foreachRDD(rdd => {
          rdd.toDF()
            .[...]
            .toJSON
            .toDF("value")
            .writeStream
            .format("kafka")

你的变量 rdd (或者Dataframe)变成了一个不再是流的rdd。因此 rdd.toDF.[...].writeStream 不会再工作了。

继续rdd

如果您选择使用dsream方法,那么可以发送那些调用kafkaproducerapi的rdd。
举个例子:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val producer = new KafkaProducer[String, String](kafkaParameters)
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

但是,这不是推荐的方法,因为您在每个执行器的每个批处理间隔中创建和关闭kafkaproducer。但这应该让您对如何使用directstreamapi将数据写入kafka有一个基本的了解。
为了进一步优化发送数据到Kafka,您可以按照这里给出的指导。

继续Dataframe

不过,您也可以将rdd转换为Dataframe,但要确保调用面向批处理的api将数据写入kafka:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

有关如何将批处理Dataframe写入kafka的所有详细信息,请参阅spark structured streaming+kafka integration guide

注意

不过,最重要的是,我强烈建议在这种情况下不要混淆rdd和结构化api,而要坚持两者。

相关问题