如何在spark结构化流媒体上使用udf(用户定义函数)?

iqih9akk  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(352)

我找了一下。这个答案告诉我,我可以在groupeddata上使用udf,它可以工作,我可以用自己的函数处理groupdata中的行和列。
根据官方教程。它们使用groupby()和window()操作来表示窗口聚合,如下所示。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

我的问题是,是否有一种方法可以在words.groupby(window(words.timestamp,“10分钟”,“5分钟”)上使用自定义项。代码可能如下所示?我试过了,但没用。

schema = StructType(
    [StructField("key", StringType()), StructField("avg_min", DoubleType())]
)

@panda_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    #whatever user-defined code 

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).apply(g)
ffscu2ro

ffscu2ro1#

在spark 3中,您可以使用 applyInPandas 相反,没有明确的 @pandas_udf (见文件):

def g(df):
    #whatever user-defined code 

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).applyInPandas(g, schema=schema)

在本例中,您将获得pandasDataframe并返回pandasDataframe。

相关问题