pyspark将模型预测与未转换的数据对齐:最佳实践

sirbozc5  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(342)

使用pyspark的ml模块,通常会发生以下步骤(在数据清理等之后):
执行特征和目标转换管道
创建模型
从模型生成预测
为了业务用户和模型验证的目的,将预测和原始数据集合并在一起
提取一段简单的代码片段:

predictions = model.transform(test_df)

这个 predictions Dataframe只会有预测(和概率,也许还有预测的转换)。但它不会包含原始数据集。

如何将预测与原始pysparkDataframe相结合?

对于我来说,如何组合原始数据集(甚至转换后的数据集)并不明显 test_df )以及预测;没有可连接的共享列,对于大型数据集来说,添加索引列似乎相当棘手。

当前解决方案:

对于大型数据集,如我正在使用的数据集,我尝试了以下建议:

test_df = test_df.repartition(predictions.rdd.getNumPartitions())
joined_schema = StructType(test_df.schema.fields + predictions.schema.fields)
interim_rdd = test_df.rdd.zip(predictions.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)
full_data.write.parquet(my_predictions_path, mode="overwrite")

但我不喜欢这样有两个原因:
我不能完全肯定秩序是否得到维持。链接表明应该是这样,但我不明白为什么。
它有时会崩溃,即使我正在强制进行如上所示的重新分区,但当我尝试通过上面最后一行写入数据时,会出现以下错误: Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition 我不想使用 monotonically_increasing_id 有时会给出建议,因为我的数据集太大,不允许这样做。
这似乎是很基本的:我怎样才能报告任何模型的质量,而不能够将预测与原始目标进行比较。其他人是怎么做到的??

4smxwvx5

4smxwvx51#

打电话的时候 model = <your ml-algorithm>.fit(df_train) 列车数据集可以有任意数量的附加列。只有包含特性和标签的列将用于训练模型(通常称为 features 以及 label ,这是可配置的),但可以显示其他列。
打电话的时候 predictions = model.transform(df_test) 在下一步的训练模型上,返回一个包含附加列的Dataframe prediction , probability 以及 rawPrediction .
尤其是原始特征列和标签列仍然是Dataframe的一部分。此外,任何属于 df_test 在输出中仍然可用,并可用于标识行。

prediction = model.transform(df_test)
prediction.printSchema()

印刷品

root
 |-- feature1: double (nullable = true)
 |-- feature2: double (nullable = true)
 |-- feature3: double (nullable = true)
 |-- label: double (nullable = true)
 |-- additional_data: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

如果 df_test 不仅包含所需列 features 但其他栏目包括 label . 通过评估 label 以及 prediction 例如,现在可以创建binaryclassificationmetrics。
打电话 model.transform 从技术上讲是dataset.withcolumn调用。
基于spark文档中的ml管道示例的示例:spark ml工作流通常从包含训练数据、特性和标签(=目标值)的Dataframe开始。在本例中,还有一个与ml过程无关的附加列。

training_original = spark.createDataFrame([
    (0.0, 1.1, 0.1, 1.0, 'any random value that is not used to train the model'),
    (2.0, 1.0, -1.0, 0.0, 'another value'),
    (2.0, 1.3, 1.0, 0.0, 'value 3'),
    (0.0, 1.2, -0.5, 1.0, 'this value is also not used for training nor testing')],  
    ["feature1", "feature2", "feature3", "label", "additional_data"])

然后使用转换器将这些特征组合成一列。这个任务最简单的转换器是矢量汇编程序

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features")
training_transformed = assembler.transform(training_original)

# +--------+--------+--------+-----+--------------------+--------------+

# |feature1|feature2|feature3|label|     additional_data|      features|

# +--------+--------+--------+-----+--------------------+--------------+

# |     0.0|     1.1|     0.1|  1.0|any random value ...| [0.0,1.1,0.1]|

# | ...

现在可以使用列在此Dataframe上训练模型 features 以及 label . 其他列存在,但将被 fit 方法。

lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(training_transformed)

现在根据测试数据对模型进行了测试。准备工作与培训资料相同:

test_df = spark.createDataFrame([
    (-1.0, 1.5, 1.3, 1.0, 'test value 1'),
    (3.0, 2.0, -0.1, 0.0, 'another test value'),
    (0.0, 2.2, -1.5, 1.0, 'this is not important')],
    ["feature1", "feature2", "feature3", "label", "additional_data"])
test_df_transformed = assembler.transform(test_df)

# +--------+--------+--------+-----+--------------------+--------------+

# |feature1|feature2|feature3|label|     additional_data|      features|

# +--------+--------+--------+-----+--------------------+--------------+

# |    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|

# | ...

运行ml magic产生

prediction = model.transform(test_df_transformed)

# +--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+

# |feature1|feature2|feature3|label|     additional_data|      features|       rawPrediction|         probability|prediction|

# +--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+

# |    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|[-6.5872014439355...|[0.00137599470692...|       1.0|

# | ...

此Dataframe现在包含原始输入数据( feature1feature3 以及 additional_data ),预期目标值( label )变换后的特征( features )模型预测的结果( prediction ). 在这里,所有输入值、目标值和预测都可以在一个数据集中使用。这里将是评估模型和计算模型所需度量的地方。将模型应用于新数据将得到相同的结果(但没有 label 当然可以)。

相关问题