我在executor map函数上编写了一些代码,从hdfs下载一些文件。
val downloadPool = java.util.concurrent.Executors.newFixedThreadPool(10)
val downloadTasks = fileStatuses.map( e =>
new Callable[String] {
override def call(): String = {
val p = currentPath + "/" + e.getPath.getName
val pAbs = Paths.get(p).toAbsolutePath.toString
println(s"begin to download file ${e.getPath.getName} to ${p}")
fileSystem.copyToLocalFile(e.getPath, new Path(pAbs))
pAbs
}}
)
val localPaths = downloadPool.invokeAll(seqAsJavaList[Callable[String]](downloadTasks)).map(e=>e.get())
上面显示的代码包含在scala对象中并使用关键字 synchronized
以确保执行器上的所有任务都将被阻止。
但是,当我运行这个spark应用程序时,我发现任务在下载完成之前就完成了。很奇怪,Spark的情况有什么不同吗?
暂无答案!
目前还没有任何答案,快来回答吧!