wholetextfiles()如何在spark hadoop集群中工作?

z2acfund  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(174)

我创建了一个rdd,如下所示:

JavaPairRDD<String,String> inputDataFiles = sparkContext.wholeTextFiles("hdfs://ip:8020/user/cdhuser/inputFolder/");

在这个rdd上我执行了一个 map 处理单个文件并调用 foreach 触发相同的 map .

JavaRDD<Object[]> output = inputDataFiles.map(new Function<Tuple2<String,String>,Object[]>()
{

        private static final long serialVersionUID = 1L;

        @Override
        public Object[] call(Tuple2<String,String> v1) throws Exception 
        { 
          System.out.println("in map!");
           //do something with v1. 
          return Object[]
        } 
});

output.foreach(new VoidFunction<Object[]>() {

        private static final long serialVersionUID = 1L;

        @Override
        public void call(Object[] t) throws Exception {
            //do nothing!
            System.out.println("in foreach!");

        }
    });

这段代码非常适合在本地笔记本电脑上进行独立设置,同时访问本地文件和远程hdfs文件。
在集群中,相同的代码不会产生任何结果。我的直觉是,数据还没有到达各个执行者,因此 map 以及 foreach 不起作用。可能是猜测。但我不明白为什么这在集群中不起作用。我甚至看不到报纸上的书面声明 map 以及 foreach 正在以集群执行模式打印。
我注意到在独立输出中有一行代码在集群执行中没有看到。

16/09/07 17:35:35 INFO WholeTextFileRDD: Input split: Paths:/user/cdhuser/inputFolder/data1.txt:0+657345,/user/cdhuser/inputFolder/data10.txt:0+657345,/user/cdhuser/inputFolder/data2.txt:0+657345,/user/cdhuser/inputFolder/data3.txt:0+657345,/user/cdhuser/inputFolder/data4.txt:0+657345,/user/cdhuser/inputFolder/data5.txt:0+657345,/user/cdhuser/inputFolder/data6.txt:0+657345,/user/cdhuser/inputFolder/data7.txt:0+657345,/user/cdhuser/inputFolder/data8.txt:0+657345,/user/cdhuser/inputFolder/data9.txt:0+657345

我也有类似的代码 textFile() 以前对集群上的单个文件有效。问题在于 wholeTextFiles() 只是。
请建议什么是最好的方式来让这个工作或其他替代方法。
我的设置是cloudera5.7发行版和spark服务。我把主人当作 yarn-client .
这个 action 可以是任何东西。调用 map . 我也试过了 System.out.println("Count is:"+output.count()); ,我得到了正确的答案 10 ,因为文件夹中有10个文件,但Map仍然拒绝工作。
谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题