foreach中的spark

kb5ga3dv  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(347)
val a = sc.textFile("/user/cts367689/datagen.txt")

val b = a.map(x => (x.split(",")(0),x.split(",")(2),x.split(4))))

val c = b.filter(x => (x._3.toInt > 500))

c.foreach(x => println(x))

c.foreach {x => {println(x)}}

当我为每个语句使用时,我没有得到预期的输出。我希望输出是一行打印一个,但不确定代码中有什么错误。

uelo1irk

uelo1irk1#

我认为这个问题已经被回答过好几次了,但现在我们再来看看官方的节目指南:
rdd的打印元素
一个常见的习惯用法是使用 rdd.foreach(println) 或者 rdd.map(println) . 在一台机器上,这将生成预期的输出并打印所有rdd元素。

scala> val rdd = sc.parallelize(Seq((1,2,3),(2,3,4)))
// rdd: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> rdd.foreach(println)
// (1,2,3)
// (2,3,4)

但是,在集群模式下,执行器调用的stdout的输出现在写入执行器的stdout,而不是驱动程序上的stdout,所以驱动程序上的stdout不会显示这些!
要在驱动程序上打印所有元素,需要 collect() 因此,数据将返回到驱动程序节点:

scala> rdd.collect().foreach(println)
// (1,2,3)
// (2,3,4)

这里是限制。如果您的数据不适合驱动程序,这可能会导致驱动程序内存不足,因为collect()会将整个rdd提取到一台机器上;从而导致你的司机爆炸。
如果只需要打印rdd的几个元素,则更安全的方法是使用take():

scala> val rdd = sc.parallelize(Range(1, 1000000000))
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> rdd.take(100).foreach(println)
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
// [...]

附言:关于 foreach 方法。 foreach 对数据集的每个元素运行函数。这种方法通常用于副作用,如更新累加器或与外部存储系统交互。
我希望这能回答你的问题。

相关问题