背景
我最初的问题是为什么要使用 DecisionTreeModel.predict
内部Map函数引发异常?与如何用mllib在spark上生成(原始标签,预测标签)元组有关?
当我们使用scalaapi时,我们推荐一种获得 RDD[LabeledPoint]
使用 DecisionTreeModel
就是简单地Map过去 RDD
:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
不幸的是,pyspark中类似的方法并不那么有效:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
异常:似乎您正试图从广播变量、操作或转换引用sparkcontext。sparkcontext只能在驱动程序上使用,不能在工作程序上运行的代码中使用。有关更多信息,请参阅spark-5063。
而不是官方文件建议如下:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
这是怎么回事?这里没有广播变量,scalaapi定义了 predict
具体如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing a single data point
* @return Double prediction from the trained model
*/
def predict(features: Vector): Double = {
topNode.predict(features)
}
/**
* Predict values for the given data set using the model trained.
*
* @param features RDD representing data points to be predicted
* @return RDD of predictions for each of the given data points
*/
def predict(features: RDD[Vector]): RDD[Double] = {
features.map(x => predict(x))
}
因此,至少乍一看,从行动或转变中召唤并不是问题,因为预测似乎是一种局部操作。
解释
经过一番挖掘,我发现问题的根源是 JavaModelWrapper.call
从decisiontreemodel.predict调用的方法。it访问 SparkContext
调用java函数需要:
callJavaFunc(self._sc, getattr(self._java_model, name), *a)
问题
万一 DecisionTreeModel.predict
这里有一个推荐的解决方法,所有必需的代码都已经是scalaapi的一部分了,但是有没有什么优雅的方法来处理这样的问题呢?
我现在唯一能想到的解决方案是非常重的:
通过隐式转换扩展spark类或添加某种 Package 器,将所有内容下推到jvm
直接使用py4j网关
1条答案
按热度按时间bwitn5fc1#
使用默认的py4j网关进行通信是不可能的。为了理解为什么我们必须查看pyspark内部文档[1]中的下图:
由于py4j网关在驱动程序上运行,因此python解释器无法访问它,因为python解释器通过套接字与jvm工作者通信(参见示例)
PythonRDD
/rdd.py
).从理论上讲,为每个工作者创建一个单独的py4j网关是可能的,但在实践中,它不太可能有用。忽略可靠性之类的问题py4j根本不是为执行数据密集型任务而设计的。
有什么解决办法吗?
使用sparksql数据源api Package jvm代码。
优点:支持,高级,不需要访问内部pysparkapi
缺点:比较冗长,没有很好的文档记录,主要局限于输入数据
使用scala udfs对Dataframe进行操作。
优点:易于实现(参见spark:how to map python with scala or java user defined functions?),如果数据已经存储在dataframe中,则python和scala之间没有数据转换,对py4j的访问最少
缺点:需要访问py4j网关和内部方法,仅限于sparksql,难以调试,不受支持
以类似于mllib的方式创建高级scala接口。
优点:灵活,能够执行任意复杂代码。它可以直接在rdd上(例如,请参见mllib model wrappers)或使用
DataFrames
(请参见如何在pyspark中使用scala类)。后一种解决方案似乎更加友好,因为所有的服务细节都已经由现有的api处理了。缺点:低级别,需要数据转换,就像udfs需要访问py4j和内部api一样,不支持
在用scala转换pyspark rdd时可以找到一些基本的例子
使用外部工作流管理工具在python和scala/java作业之间切换,并将数据传递给dfs。
优点:易于实现,对代码本身的更改最少
缺点:读/写数据的成本(alluxio?)
使用共享
SQLContext
(例如,请参见apachezeppelin或livy)使用注册的临时表在访客语言之间传递数据。优点:非常适合交互式分析
缺点:不太适合批量作业(齐柏林飞艇),也可能需要额外的编排(利维)
约书亚·罗森(2014年8月4日)Pypark内部构件。检索自https://cwiki.apache.org/confluence/display/spark/pyspark+internals