apachespark在完全分布式模式下对执行器执行操作

ubby3x7f  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(333)

我是新的Spark,我有如何转变和行动的工作(指南)的基本想法。我在文本文件的每一行(基本上是段落)上尝试一些nlp操作。在处理之后,结果应该被发送到服务器(restapi)进行存储。该程序作为spark作业(使用spark submit提交)在中由10个节点组成的集群上运行 yarn 模式。这就是我到目前为止所做的。

...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
    .map(line -> {
        // processed here
        return result;
    });
processedLines.foreach(line -> {
    // Send to server
});

这是有效的,但是 foreach 循环似乎是连续的,它似乎没有在工作节点上以分布式模式运行。我说的对吗?
我尝试了下面的代码,但不起作用。错误: java: incompatible types: inferred type does not conform to upper bound(s) . 显然是错的因为 map 是一种转变,不是一种行动。

lines.map(line -> { /* processing */ })
     .map(line -> { /* Send to server */ });

我也试过了 take() ,但它需要 int 以及 processedLines.count() 属于类型 long .

processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });

数据量巨大(大于100gb)。我想要的是,处理和发送到服务器都应该在工作节点上完成。中的处理部分 map 违抗地发生在工作节点上。但是如何将已处理的数据从工作节点发送到服务器,因为 foreach 似乎顺序循环发生在驱动程序(如果我是正确的)。简单地说,如何执行 action 在工作节点中,而不是在驱动程序中。
任何帮助都将不胜感激。

9udxz4iz

9udxz4iz1#

foreach 是Spark中的一个动作。它基本上接受rdd的每个元素,并对该元素应用一个函数。 foreach 在执行器节点或工作节点上执行。它不会应用于驱动程序节点。注意,在运行spark的本地执行模式中,驱动程序和执行程序节点可以驻留在同一个jvm上。
请勾选此项以获取每个解释的参考
如果您试图Maprdd的每个元素,然后应用它,那么您的方法看起来是正确的 foreach 到每个元素。我能想到为什么要花时间的原因是因为您要处理的数据大小(~100gb)。
对此进行优化的一种方法是 repartition 输入数据集。理想情况下,每个分区的大小应为128mb,以获得更好的性能结果。有许多文章是关于数据重新分区的最佳实践的。我建议你遵循它们,这会给你带来一些性能上的好处。
您可以考虑的第二个优化是分配给每个executor节点的内存。它在进行Spark调谐时起着非常重要的作用。
您可以想到的第三个优化是,对服务器的网络调用进行批处理。您当前正在为rdd的每个元素对服务器进行网络调用。如果您的设计允许批处理这些网络调用,则可以在单个网络调用中发送多个元素。如果产生的延迟主要是由于这些网络调用造成的,这可能也会有所帮助。
我希望这有帮助。

lmvvr0a8

lmvvr0a82#

首先,当您的代码在执行器上运行时,它已经处于分布式模式。现在,当您想利用执行器上的所有cpu资源来获得更多的并行性时,您应该考虑一些 async 选项,更倾向于批处理模式操作,以避免过度创建客户端连接对象,如下所示。
可以将代码替换为

processedLines.foreach(line -> {

用这两种方法中的任何一种

processedLines.foreachAsync(line -> {
    // Send to server
}).get();

//To iterate batch wise I would go for this
processedLines.foreachPartitionAsync(lineIterator -> {
// Create your ouput client connection here
    while (lineIterator.hasNext()){
        String line  = lineIterator.next();
    }
}).get();

这两个函数都将创建一个未来的对象或提交一个新线程或一个取消阻塞调用,这将自动为代码添加并行性。

相关问题