java—查找最近“x”分钟内修改的文件的最快方法

wvyml7n5  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(360)

我有一个要求,找到文件修改在过去10分钟在一个目录。目录不断更新,每次都会有大约50k-60k个文件。我正在使用以下代码获取文件:

import java.io.File
import java.time.Instant

val dir = new File("/path/to/dir") 
val files = dir.listFiles.toList.filter(f => f.getName.matches("some filter"))
files.filter(f => f.isFile && f.exists &&
    Instant.ofEpochMilli(f.lastModified).plus(10, MINUTES).isAfter(Instant.now))
    .toList.sortBy(_.lastModified)

这大约需要20-30分钟。但我想在10分钟内得到结果。我甚至试着用spark在hadoop集群中运行这个。这是Spark代码:

val sparkConfig = new SparkConf()
    .setAppName("findRecentFiles")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.shuffle.compress", "true")
    .set("spark.rdd.compress", "true")
val sc = new SparkContext(sparkConfig)
val rdd = sc.parallelize(files)
rdd.filter(f => f.isFile && f.exists &&
    Instant.ofEpochMilli(f.lastModified).plus(10, MINUTES).isAfter(Instant.now))
    .collect.toList.sortBy(_.lastModified)

但还是要花同样的时间。我注意到基于文件名的过滤速度很快。但是添加最新修改的过滤器会使它变慢。有没有更好的办法让我更快地得到结果?
更新
我更新了spark配置,现在我可以在不到10分钟内得到结果。早些时候,我是这样操作jar的:

spark-submit myJar.jar

我改成这样:

spark-submit --deploy-mode client --queue SomeNonDefaultQueue --executor-memory 16g --num-executors 10 --executor-cores 1 --master yarn myJar.jar

也已删除 set("spark.rdd.compress", "true") 从增加cpu时间的代码中,如下所述-https://spark.apache.org/docs/2.3.0/configuration.html#compression-和序列化

hm2xizp9

hm2xizp91#

问题在于 stat() check-to-get-last-modified是在对目录进行线性搜索以查找名称之后出现的。如果可以更改目录格式,请添加子目录(按文件名计算),并尝试将每个子目录中的条目数分组为~1000。
否则,创建name:lastmodified and 使用 WatchService 在触发事件时更新Map。

相关问题