使用pyspark的ml模块,通常会发生以下步骤(在数据清理等之后):
执行特征和目标转换管道
创建模型
从模型生成预测
为了业务用户和模型验证的目的,将预测和原始数据集合并在一起
提取一段简单的代码片段:
predictions = model.transform(test_df)
这个 predictions
Dataframe只会有预测(和概率,也许还有预测的转换)。但它不会包含原始数据集。
如何将预测与原始pysparkDataframe相结合?
对于我来说,如何组合原始数据集(甚至转换后的数据集)并不明显 test_df
)以及预测;没有可连接的共享列,对于大型数据集来说,添加索引列似乎相当棘手。
当前解决方案:
对于大型数据集,如我正在使用的数据集,我尝试了以下建议:
test_df = test_df.repartition(predictions.rdd.getNumPartitions())
joined_schema = StructType(test_df.schema.fields + predictions.schema.fields)
interim_rdd = test_df.rdd.zip(predictions.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)
full_data.write.parquet(my_predictions_path, mode="overwrite")
但我不喜欢这样有两个原因:
我不能完全肯定秩序是否得到维持。链接表明应该是这样,但我不明白为什么。
它有时会崩溃,即使我正在强制进行如上所示的重新分区,但当我尝试通过上面最后一行写入数据时,会出现以下错误: Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
我不想使用 monotonically_increasing_id
有时会给出建议,因为我的数据集太大,不允许这样做。
这似乎是很基本的:我怎样才能报告任何模型的质量,而不能够将预测与原始目标进行比较。其他人是怎么做到的??
1条答案
按热度按时间4smxwvx51#
打电话的时候
model = <your ml-algorithm>.fit(df_train)
列车数据集可以有任意数量的附加列。只有包含特性和标签的列将用于训练模型(通常称为features
以及label
,这是可配置的),但可以显示其他列。打电话的时候
predictions = model.transform(df_test)
在下一步的训练模型上,返回一个包含附加列的Dataframeprediction
,probability
以及rawPrediction
.尤其是原始特征列和标签列仍然是Dataframe的一部分。此外,任何属于
df_test
在输出中仍然可用,并可用于标识行。印刷品
如果
df_test
不仅包含所需列features
但其他栏目包括label
. 通过评估label
以及prediction
例如,现在可以创建binaryclassificationmetrics。打电话
model.transform
从技术上讲是dataset.withcolumn调用。基于spark文档中的ml管道示例的示例:spark ml工作流通常从包含训练数据、特性和标签(=目标值)的Dataframe开始。在本例中,还有一个与ml过程无关的附加列。
然后使用转换器将这些特征组合成一列。这个任务最简单的转换器是矢量汇编程序
现在可以使用列在此Dataframe上训练模型
features
以及label
. 其他列存在,但将被fit
方法。现在根据测试数据对模型进行了测试。准备工作与培训资料相同:
运行ml magic产生
此Dataframe现在包含原始输入数据(
feature1
至feature3
以及additional_data
),预期目标值(label
)变换后的特征(features
)模型预测的结果(prediction
). 在这里,所有输入值、目标值和预测都可以在一个数据集中使用。这里将是评估模型和计算模型所需度量的地方。将模型应用于新数据将得到相同的结果(但没有label
当然可以)。