在apache spark中尝试将Dataframe写入csv时行为不一致

imzjd6km  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(297)

我正在尝试将使用spark的mllib训练的决策树分类器的最佳超参数输出到使用dataframes和spark csv的csv文件中。下面是我的代码片段:

// Split the data into training and test sets (10% held out for testing)
val Array(trainingData, testData) = assembledData.randomSplit(Array(0.9, 0.1))

// Define cross validation with a hyperparameter grid
val crossval = new CrossValidator()
    .setEstimator(classifier)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(new BinaryClassificationEvaluator)
    .setNumFolds(10)

// Train model
val model = crossval.fit(trainingData)

// Find best hyperparameter combination and create an RDD 
val bestModel = model.bestModel
val hyperparamList = new ListBuffer[(String, String)]()
bestModel.extractParamMap().toSeq.foreach(pair => {
    val hyperparam: Tuple2[String,String] = (pair.param.name,pair.value.toString)
    hyperparamList += hyperparam
})
val hyperparameters = sqlContext.sparkContext.parallelize(hyperparamList.toSeq)

// Print the best hyperparameters 
println(bestModel.extractParamMap().toSeq.foreach(pair => {
    println(s"${pair.param.parent} ${pair.param.name}")
    println(pair.value)
}))

// Define csv path to output results
var csvPath: String  = "/root/results/decision-tree"
val hyperparametersPath: String = csvPath+"/hyperparameters"
val hyperparametersFile: File = new File(hyperparametersPath)
val results = (hyperparameters, hyperparametersPath, hyperparametersFile)

// Convert RDD to Dataframe and write it as csv 
val dfToSave = spark.createDataFrame(results._1.map(x => Row(x._1, x._2)))
dfToSave.write.format("csv").mode("overwrite").save(results._2)

// Stop spark session
spark.stop()

在完成一个Spark工作后,我可以看到第00部分*。。。和\u路径中的成功文件。然而,尽管在本例中总共有13个超参数(通过在屏幕上打印来确认), cat -打开csv文件显示并非每个超参数都已写入csv:

user@master:~$ cat /root/results/decision-tree/hyperparameters/part*.csv
checkpointInterval,10
featuresCol,features
maxDepth,5
minInstancesPerNode,1

而且,每次执行时都会写入的超参数发生变化。这是在一个基于hdfs的spark集群上执行的,该集群有1个主节点和3个工作节点,它们拥有完全相同的硬件。可能是比赛条件吗?如果是,我怎么解决?
提前谢谢。

0qx6xfy6

0qx6xfy61#

我想我知道了。我预料到了 dfTosave.write.format("csv")save(path) 将所有内容写入主节点,但由于任务分配给所有worker,因此每个worker将其部分超参数保存到其文件系统中的本地csv。因为在我的例子中,主节点也是一个工作节点,我可以看到它的超参数部分。“不一致行为”(即每次执行时看到不同的部分)是由spark用于在worker之间分配分区的任何算法引起的。
我的解决方案是从所有工人那里收集CSV scp 或者 rsync 建立完整的结果。

相关问题