我正在研究在pyspark(v3)中使用pandas-udf。出于许多原因,我理解迭代和udf通常是不好的,我理解我在这里展示的简单示例可以使用sql函数来完成—所有这些都是多余的!
我一直遵循这个指南:https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html
我有一个简单的例子从文件工作:
import pandas as pd
from typing import Iterator, Tuple
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
spark = SparkSession.builder.getOrCreate()
pdf = pd.DataFrame(([1, 2, 3], [4, 5, 6], [8, 9, 0]), columns=["x", "y", "z"])
df = spark.createDataFrame(pdf)
@pandas_udf('long')
def test1(x: pd.Series, y: pd.Series) -> pd.Series:
return x + y
df.select(test1(col("x"), col("y"))).show()
这对于执行基本的算术很有效-如果我想加法、乘法等,这是直接的(但是在没有函数的pyspark中也很简单)。
我想对这些值进行比较,例如:
@pandas_udf('long')
def test2(x: pd.Series, y: pd.Series) -> pd.Series:
return x if x > y else y
df.select(test2(col("x"), col("y"))).show()
这将与 ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
. 我知道它是在计算序列而不是行值。
这里有一个迭代器的例子。同样,对于他们提供的基本算术示例,这也很好。但如果我尝试运用逻辑:
@pandas_udf("long")
def test3(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for x, y in batch_iter:
yield x if x > y else y
df.select(test3(col("x"), col("y"))).show()
我得到了和以前一样的值错误。
所以我的问题是,我应该如何进行这样的逐行比较?在矢量化函数中有可能吗?如果没有,那么它们的用例是什么?
2条答案
按热度按时间k3bvogb11#
我想出来了。在你把问题写下来并向全世界公布之后就这么简单了。
需要做的就是返回一个数组,然后转换为一个系列:
7uzetpgm2#
我花了两天的时间来寻找这个答案,谢谢你西蒙·德莫瑞斯!
我需要一个稍微修改一下的例子。为了便于管理,我将单个udf分解为多个组件。下面是一个我用来给别人参考的例子: