在pyspark中使用stringindexer/onehotcodener/vectorsassembler之后使用standardscaler时出错

xmq68pz9  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(321)

这个问题类似于一个不适合pyspark的老问题:相似
我有dataframe,想在上面应用一个ml决策树。在这个Dataframe中,有两个分类列。所以,我按照以下步骤来准备:
在两个分类列上定义stringindexer
在上一步的结果上定义一个hotcoder
在两个新列上定义vectorassembler+所有数值列(标签列除外)以构建 features 矢量
在上定义standardscaler features
在管道中应用所有定义的变压器:

mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
pipelineModel = mem_pipiline.fit(data)
transformed = pipelineModel.transform(data)

然而,我得到了一个非常大的错误,我不完全理解它的含义:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-58-fd210f3dee7e> in <module>
      2 
      3 mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
----> 4 pipelineModel = mem_pipiline.fit(data)
      5 transformed = pipelineModel.transform(data)

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
    127                 return self.copy(params)._fit(dataset)
    128             else:
--> 129                 return self._fit(dataset)
    130         else:
    131             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
    127                 return self.copy(params)._fit(dataset)
    128             else:
--> 129                 return self._fit(dataset)
    130         else:
    131             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    319 
    320     def _fit(self, dataset):
--> 321         java_model = self._fit_java(dataset)
    322         model = self._create_model(java_model)
    323         return self._copyValues(model)

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    316         """
    317         self._transfer_params_to_java()
--> 318         return self._java_obj.fit(dataset._jdf)
    319 
    320     def _fit(self, dataset):

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
    129     def deco(*a,**kw):
    130         try:
--> 131             return f(*a,**kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o866.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 81 in stage 126.0 failed 1 times, most recent failure: Lost task 81.0 in stage 126.0 (TID 7338, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3599/0x000000084145a040: (struct<CMD_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,type_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,ts_double_VectorAssembler_439ad1dce146:double,PID_double_VectorAssembler_439ad1dce146:double,MINFLT_double_VectorAssembler_439ad1dce146:double,MAJFLT_double_VectorAssembler_439ad1dce146:double,VSTEXT_double_VectorAssembler_439ad1dce146:double,VSIZE:double,RSIZE:double,VGROW:double,RGROW:double,MEM:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:109)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)

在应用standardscaler之前,我检查了features列,它看起来不是一个向量。也许这就是问题所在?

(430,[211,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421904E9,2770.0,2810.0,193.0,332.4,12180.0,332.4,12180.0])
(430,[254,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.554219055E9,2697.0,3324.0,47.0,508.4,11752.0,508.4,11752.0])
(430,[35,415,420,421,422,423,424,425,426,427,428],[1.0,1.0,1.55421918E9,2708.0,1595.0,1.0,35.0,352.1,5044.0,352.1,5044.0])
(430,[316,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421933E9,2932.0,947.0,24.0,264.0,2988.0,264.0,2988.0])
(430,[214,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421949E9,1245.0,404.0,25.0,20012.0,960.0,20012.0,960.0])

你知道我应该如何正确地执行这些步骤吗?如果你需要更多的信息,请告诉我。谢谢。

n3h0vuf2

n3h0vuf21#

我真的发现了问题。它与某些生物的存在有关 null 原始Dataframe中的值。

相关问题