我试图从kafka读取一个流,其中的值是一个逗号分隔的值字符串(表示数据集中的列),目标是读取两个这样的流并将它们连接起来。
如果我是从一个文件中读取,有一种方法可以通过给输入流分配一个分隔符和一个模式来实现。这就是我能做到的:
val stearm_L: DataFrame = spark.readStream
.option("sep", ";")
.schema(schema_L)
.csv("inputFileSteam_L")
如果我读的是Kafka而不是一个文件,我怎么能做同样的事呢?
1条答案
按热度按时间8cdiaqws1#
而不是
csv("filename")
,你基本上用format("kafka")
.在spark流媒体部分下有一个关于kafka集成的页面,可以了解更多细节。
关于获取csv解析,请参阅spark streaming:read csv string from kafka,write to parquet