使用scala将spark处理的结果转储到hdfs

yhxst69z  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(651)

在使用spark处理数据之后,我对如何将数据保存到hdfs有点困惑。
这就是我要做的。我正在计算数值字段的最小值、最大值和标准差。我的输入文件有数百万行,但输出只有大约15-20个字段。因此,输出是每个字段的单个值(标量)。
例如:我将把field1的所有行加载到一个rdd中,最后,我将得到field1的3个单一值(min、max、sd)。我将这三个值连接到临时字符串中。最后,我将有15到20行,包含以下格式的4列

FIELD_NAME_1  MIN  MAX  SD
FIELD_NAME_2  MIN  MAX  SD

以下是代码片段:

//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))

val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev

所以,我有3个变量,最小值,最大值和sd,我想存储回hdfs。
问题1:由于输出将非常小,我是否只将其保存在本地服务器上?或者我应该把它倒进hdfs。在我看来,在本地转储文件更有意义。
问题2:在spark中,我可以调用以下命令将rdd保存到文本文件中

some_RDD.saveAsTextFile("hdfs://namenode/path")

对于一个在scala中不是rdd的字符串变量,我如何在中完成同样的事情?我应该先将结果并行化为rdd,然后调用saveastextfile吗?

dw1jzc5e

dw1jzc5e1#

答1:因为您只需要几个标量,所以我想说的是将它们存储在本地文件系统中。你可以先做 val localValue = rdd.collect() ,它将从工人那里收集所有数据以供掌握。然后调用java.io将内容写入磁盘。
回答2:你可以做sc.parallelize(yourstring).saveastextfile(“hdfs://host/yourfile"). 他们会把事情写进第000部分*。如果你想把所有的东西都放在一个文件里, hdfs dfs -getmerge 是来帮你的。

72qzrwbm

72qzrwbm2#

要在本地保存,请执行以下操作 some_RDD.collect() 然后使用类似于此问题的内容保存生成的数组。是的,如果数据集很小,并且可以很容易地放入内存中,您应该收集数据并将其带到程序的驱动程序中。如果数据存储在内存中有点太大,另一个选择就是 some_RDD.coalesce(numParitionsToStoreOn) . 牢记 coalesce 也需要一个布尔值 shuffle ,如果在合并之前对数据进行计算,则应将其设置为true以获得更多的计算并行性。coalesce将减少调用时存储数据的节点数 some_RDD.saveAsTextFile("hdfs://namenode/path") . 如果文件非常小,但您需要在hdfs上使用它,请致电 repartition(1) ,与 coalesce(1,true) ,这将确保您的数据只保存在一个节点上。
更新:所以如果您只想在hdfs中保存三个值,那么您可以这样做。 sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile") 基本上,您只需将3个变量放入一个元组中,将其 Package 在一个列表中,并将并行度设置为1,因为数据非常小

相关问题