这个问题类似于一个不适合pyspark的老问题:相似
我有dataframe,想在上面应用一个ml决策树。在这个Dataframe中,有两个分类列。所以,我按照以下步骤来准备:
在两个分类列上定义stringindexer
在上一步的结果上定义一个hotcoder
在两个新列上定义vectorassembler+所有数值列(标签列除外)以构建 features
矢量
在上定义standardscaler features
柱
在管道中应用所有定义的变压器:
mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
pipelineModel = mem_pipiline.fit(data)
transformed = pipelineModel.transform(data)
然而,我得到了一个非常大的错误,我不完全理解它的含义:
Py4JJavaError Traceback (most recent call last)
<ipython-input-58-fd210f3dee7e> in <module>
2
3 mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
----> 4 pipelineModel = mem_pipiline.fit(data)
5 transformed = pipelineModel.transform(data)
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
127 return self.copy(params)._fit(dataset)
128 else:
--> 129 return self._fit(dataset)
130 else:
131 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
127 return self.copy(params)._fit(dataset)
128 else:
--> 129 return self._fit(dataset)
130 else:
131 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
319
320 def _fit(self, dataset):
--> 321 java_model = self._fit_java(dataset)
322 model = self._create_model(java_model)
323 return self._copyValues(model)
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
316 """
317 self._transfer_params_to_java()
--> 318 return self._java_obj.fit(dataset._jdf)
319
320 def _fit(self, dataset):
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
129 def deco(*a,**kw):
130 try:
--> 131 return f(*a,**kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o866.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 81 in stage 126.0 failed 1 times, most recent failure: Lost task 81.0 in stage 126.0 (TID 7338, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3599/0x000000084145a040: (struct<CMD_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,type_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,ts_double_VectorAssembler_439ad1dce146:double,PID_double_VectorAssembler_439ad1dce146:double,MINFLT_double_VectorAssembler_439ad1dce146:double,MAJFLT_double_VectorAssembler_439ad1dce146:double,VSTEXT_double_VectorAssembler_439ad1dce146:double,VSIZE:double,RSIZE:double,VGROW:double,RGROW:double,MEM:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:109)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
在应用standardscaler之前,我检查了features列,它看起来不是一个向量。也许这就是问题所在?
(430,[211,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421904E9,2770.0,2810.0,193.0,332.4,12180.0,332.4,12180.0])
(430,[254,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.554219055E9,2697.0,3324.0,47.0,508.4,11752.0,508.4,11752.0])
(430,[35,415,420,421,422,423,424,425,426,427,428],[1.0,1.0,1.55421918E9,2708.0,1595.0,1.0,35.0,352.1,5044.0,352.1,5044.0])
(430,[316,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421933E9,2932.0,947.0,24.0,264.0,2988.0,264.0,2988.0])
(430,[214,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421949E9,1245.0,404.0,25.0,20012.0,960.0,20012.0,960.0])
你知道我应该如何正确地执行这些步骤吗?如果你需要更多的信息,请告诉我。谢谢。
1条答案
按热度按时间n3h0vuf21#
我真的发现了问题。它与某些生物的存在有关
null
原始Dataframe中的值。