我有一个csv文件在一个文件夹,是不断更新。我需要从这个csv文件中获取输入并生成一些事务。如何从不断更新的csv文件中获取数据(假设每5分钟更新一次)?
我试过以下方法:
val csvDF = spark
.readStream
.option("sep", ",")
.schema(userSchema)
.csv("file:///home/location/testFiles")
但问题是它正在监视文件夹中是否创建了任何新文件。。。但我的问题是只有一个文件是不断更新。
我有一个csv文件在一个文件夹,是不断更新。我需要从这个csv文件中获取输入并生成一些事务。如何从不断更新的csv文件中获取数据(假设每5分钟更新一次)?
我试过以下方法:
val csvDF = spark
.readStream
.option("sep", ",")
.schema(userSchema)
.csv("file:///home/location/testFiles")
但问题是它正在监视文件夹中是否创建了任何新文件。。。但我的问题是只有一个文件是不断更新。
2条答案
按热度按时间t1rydlwq1#
首先,我不确定您是如何到达这里的,因为csv文件应该按顺序写入,这样可以实现更好的输入/输出。因此,我建议您创建一个只附加的文件,并尝试获取流数据,就像从binlog获取数据一样。
然而,如果你必须这样做,我认为streamingcontext可以帮助你。
mpbci0fu2#
我有一个csv文件在一个文件夹的位置,是不断更新的每一次。我需要从这个csv文件中获取输入并生成一些事务。如何从不断更新的csv文件中获取数据,假设每5分钟更新一次。
热释光;医生,这行不通。
默认情况下,spark structured streaming监视目录中的文件,并为每个新文件触发一个计算。一旦处理完文件,就再也不会处理该文件了。这是默认的实现。
您可以编写自己的流式源代码来监视文件的更改,但这是一个自定义源代码开发(在大多数情况下,这还不值得付出努力)。