读取Kafka主题处理数据,并使用scala和spark写回Kafka主题

jhkqcmku  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(258)

嗨,我在读一个Kafka主题,我想处理从Kafka接收到的数据,如tockenization,过滤掉不必要的数据,删除停止词,最后我想写回另一个Kafka主题

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
val writeStream = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()
    writeStream.awaitTermination()

然后我得到下面的错误
线程“main”org.apache.spark.sql.analysisexception中出现异常:必须使用writestream.start()执行流源查询;;
然后我编辑了我的代码如下,从Kafka读取并写入控制台

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();

// then perform the data processing part as mentioned in the first half

对于第二种方法,在控制台中连续显示数据,但从不通过数据处理部分。我能知道如何从一个Kafka主题中读取数据,然后对接收到的数据执行一些操作(标记化、删除停止词),最后写回一个新的Kafka主题吗?
编辑
在出错期间,堆栈跟踪指向上述代码中的df.show(false)

icomxhvb

icomxhvb1#

当前实现中有两个常见问题:
应用 show 在流式处理上下文中
后面的代码 awaitTermination 不会被执行
到1。
方法 show 是Dataframe上的动作(与转换相反)。当您处理流式Dataframe时,这将导致一个错误,因为需要启动流式查询 start (正如免责条款告诉你的那样)。
到2。
方法 awaitTermination 是一种阻塞方法,意味着后续代码不会在每个微批处理中执行。
共同解法
如果您想读写kafka,并且在这两者之间,您想通过在控制台中显示数据来了解正在处理的数据,您可以执行以下操作:

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.writeStream
      .outputMode("append")
      .format("console")
      .start()

val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
// the columns `key` and `value` of the DataFrame `cleanedDataframe` will be used for producing the message into the Kafka topic.
val writeStreamKafka = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()

existingSparkSession.awaitAnyTermination()

注意 existingSparkSession.awaitAnyTermination() 在代码的最后不使用 awaitTermination 直接在 start . 另外,请记住 key 以及 value Dataframe的 cleanedDataframe 将用于将消息生成到Kafka主题中。但是,一列 key 不需要,另请参见此处
另外,如果使用检查点(推荐),那么需要设置两个不同的位置:一个用于控制台流,另一个用于kafka输出流。重要的是要记住那些流式查询是独立运行的。

相关问题