如何在csv文件的更新行上运行流式查询?

wvyml7n5  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(395)

我有一个csv文件在一个文件夹,是不断更新。我需要从这个csv文件中获取输入并生成一些事务。如何从不断更新的csv文件中获取数据(假设每5分钟更新一次)?
我试过以下方法:

val csvDF = spark
  .readStream
  .option("sep", ",")
  .schema(userSchema)
  .csv("file:///home/location/testFiles")

但问题是它正在监视文件夹中是否创建了任何新文件。。。但我的问题是只有一个文件是不断更新。

t1rydlwq

t1rydlwq1#

首先,我不确定您是如何到达这里的,因为csv文件应该按顺序写入,这样可以实现更好的输入/输出。因此,我建议您创建一个只附加的文件,并尝试获取流数据,就像从binlog获取数据一样。
然而,如果你必须这样做,我认为streamingcontext可以帮助你。

val ssc = new StreamingContext(new SparkConf(), Durations.milliseconds(1))
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("/tmp", (x: Path) => true, newFilesOnly = false).map(_._2.toString)
mpbci0fu

mpbci0fu2#

我有一个csv文件在一个文件夹的位置,是不断更新的每一次。我需要从这个csv文件中获取输入并生成一些事务。如何从不断更新的csv文件中获取数据,假设每5分钟更新一次。
热释光;医生,这行不通。
默认情况下,spark structured streaming监视目录中的文件,并为每个新文件触发一个计算。一旦处理完文件,就再也不会处理该文件了。这是默认的实现。
您可以编写自己的流式源代码来监视文件的更改,但这是一个自定义源代码开发(在大多数情况下,这还不值得付出努力)。

相关问题