scala—使用spark streaming从kafka读取流并为其分配模式

niwlg2el  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(274)

我试图从kafka读取一个流,其中的值是一个逗号分隔的值字符串(表示数据集中的列),目标是读取两个这样的流并将它们连接起来。
如果我是从一个文件中读取,有一种方法可以通过给输入流分配一个分隔符和一个模式来实现。这就是我能做到的:

val stearm_L: DataFrame = spark.readStream
      .option("sep", ";")
      .schema(schema_L)
      .csv("inputFileSteam_L")

如果我读的是Kafka而不是一个文件,我怎么能做同样的事呢?

8cdiaqws

8cdiaqws1#

而不是 csv("filename") ,你基本上用 format("kafka") .
在spark流媒体部分下有一个关于kafka集成的页面,可以了解更多细节。
关于获取csv解析,请参阅spark streaming:read csv string from kafka,write to parquet

相关问题