python从任务调用java/scala函数

dvtswwa3  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(373)

背景

我最初的问题是为什么要使用 DecisionTreeModel.predict 内部Map函数引发异常?与如何用mllib在spark上生成(原始标签,预测标签)元组有关?
当我们使用scalaapi时,我们推荐一种获得 RDD[LabeledPoint] 使用 DecisionTreeModel 就是简单地Map过去 RDD :

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

不幸的是,pyspark中类似的方法并不那么有效:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

异常:似乎您正试图从广播变量、操作或转换引用sparkcontext。sparkcontext只能在驱动程序上使用,不能在工作程序上运行的代码中使用。有关更多信息,请参阅spark-5063。
而不是官方文件建议如下:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

这是怎么回事?这里没有广播变量,scalaapi定义了 predict 具体如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

因此,至少乍一看,从行动或转变中召唤并不是问题,因为预测似乎是一种局部操作。

解释

经过一番挖掘,我发现问题的根源是 JavaModelWrapper.call 从decisiontreemodel.predict调用的方法。it访问 SparkContext 调用java函数需要:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

问题

万一 DecisionTreeModel.predict 这里有一个推荐的解决方法,所有必需的代码都已经是scalaapi的一部分了,但是有没有什么优雅的方法来处理这样的问题呢?
我现在唯一能想到的解决方案是非常重的:
通过隐式转换扩展spark类或添加某种 Package 器,将所有内容下推到jvm
直接使用py4j网关

bwitn5fc

bwitn5fc1#

使用默认的py4j网关进行通信是不可能的。为了理解为什么我们必须查看pyspark内部文档[1]中的下图:

由于py4j网关在驱动程序上运行,因此python解释器无法访问它,因为python解释器通过套接字与jvm工作者通信(参见示例) PythonRDD / rdd.py ).
从理论上讲,为每个工作者创建一个单独的py4j网关是可能的,但在实践中,它不太可能有用。忽略可靠性之类的问题py4j根本不是为执行数据密集型任务而设计的。
有什么解决办法吗?
使用sparksql数据源api Package jvm代码。
优点:支持,高级,不需要访问内部pysparkapi
缺点:比较冗长,没有很好的文档记录,主要局限于输入数据
使用scala udfs对Dataframe进行操作。
优点:易于实现(参见spark:how to map python with scala or java user defined functions?),如果数据已经存储在dataframe中,则python和scala之间没有数据转换,对py4j的访问最少
缺点:需要访问py4j网关和内部方法,仅限于sparksql,难以调试,不受支持
以类似于mllib的方式创建高级scala接口。
优点:灵活,能够执行任意复杂代码。它可以直接在rdd上(例如,请参见mllib model wrappers)或使用 DataFrames (请参见如何在pyspark中使用scala类)。后一种解决方案似乎更加友好,因为所有的服务细节都已经由现有的api处理了。
缺点:低级别,需要数据转换,就像udfs需要访问py4j和内部api一样,不支持
在用scala转换pyspark rdd时可以找到一些基本的例子
使用外部工作流管理工具在python和scala/java作业之间切换,并将数据传递给dfs。
优点:易于实现,对代码本身的更改最少
缺点:读/写数据的成本(alluxio?)
使用共享 SQLContext (例如,请参见apachezeppelin或livy)使用注册的临时表在访客语言之间传递数据。
优点:非常适合交互式分析
缺点:不太适合批量作业(齐柏林飞艇),也可能需要额外的编排(利维)
约书亚·罗森(2014年8月4日)Pypark内部构件。检索自https://cwiki.apache.org/confluence/display/spark/pyspark+internals

相关问题