spark3与Pandas矢量自定义项的

llycmphe  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(383)

我正在研究在pyspark(v3)中使用pandas-udf。出于许多原因,我理解迭代和udf通常是不好的,我理解我在这里展示的简单示例可以使用sql函数来完成—所有这些都是多余的!
我一直遵循这个指南:https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html
我有一个简单的例子从文件工作:

import pandas as pd
from typing import Iterator, Tuple
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf

spark = SparkSession.builder.getOrCreate()

pdf = pd.DataFrame(([1, 2, 3], [4, 5, 6], [8, 9, 0]), columns=["x", "y", "z"])
df = spark.createDataFrame(pdf)

@pandas_udf('long')
def test1(x: pd.Series, y: pd.Series) -> pd.Series:
    return x + y

df.select(test1(col("x"), col("y"))).show()

这对于执行基本的算术很有效-如果我想加法、乘法等,这是直接的(但是在没有函数的pyspark中也很简单)。
我想对这些值进行比较,例如:

@pandas_udf('long')
def test2(x: pd.Series, y: pd.Series) -> pd.Series:
    return x if x > y else y

df.select(test2(col("x"), col("y"))).show()

这将与 ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all(). . 我知道它是在计算序列而不是行值。
这里有一个迭代器的例子。同样,对于他们提供的基本算术示例,这也很好。但如果我尝试运用逻辑:

@pandas_udf("long")
def test3(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for x, y in batch_iter:
        yield x if x > y else y

df.select(test3(col("x"), col("y"))).show()

我得到了和以前一样的值错误。
所以我的问题是,我应该如何进行这样的逐行比较?在矢量化函数中有可能吗?如果没有,那么它们的用例是什么?

k3bvogb1

k3bvogb11#

我想出来了。在你把问题写下来并向全世界公布之后就这么简单了。
需要做的就是返回一个数组,然后转换为一个系列:

@pandas_udf('long')
def test4(x: pd.Series, y: pd.Series) -> pd.Series:
    return pd.Series([a if a > b else b for a, b in zip(x, y)])

df.select(test4(col("x"),col("y"))).show()
7uzetpgm

7uzetpgm2#

我花了两天的时间来寻找这个答案,谢谢你西蒙·德莫瑞斯!
我需要一个稍微修改一下的例子。为了便于管理,我将单个udf分解为多个组件。下面是一个我用来给别人参考的例子:

xdf = pd.DataFrame(([1, 2, 3,'Fixed'], [4, 5, 6,'Variable'], [8, 9, 0,'Adjustable']), columns=["x", "y", "z", "Description"])
df = spark.createDataFrame(xdf)

def fnRate(x):
    return pd.Series(['Fixed' if 'Fixed' in str(v) else 'Variable' if 'Variable' in str(v) else 'Other' for v in zip(x)])

@pandas_udf('string')
def fnRateRecommended(Description: pd.Series) -> pd.Series:
    varProduct = fnRate(Description)

    return varProduct

# call function

df.withColumn("Recommendation", fnRateRecommended(sf.col("Description"))).show()

相关问题