pyspark中的python插值抛出java.lang.illegalargumentexception

rm5edbpk  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(668)

当Dataframe包含许多列时,我不知道如何在pyspark中插值。让我解释一下。

from pyspark.sql.functions import to_timestamp

df = spark.createDataFrame([
    ("John",  "A", "2018-02-01 03:00:00", 60),  
    ("John",  "A", "2018-02-01 03:03:00", 66),  
    ("John",  "A", "2018-02-01 03:05:00", 70),  
    ("John",  "A", "2018-02-01 03:08:00", 76),  
    ("Mo",    "A", "2017-06-04 01:05:00", 10),  
    ("Mo",    "A", "2017-06-04 01:07:00", 20),  
    ("Mo",    "B", "2017-06-04 01:10:00", 35),  
    ("Mo",    "B", "2017-06-04 01:11:00", 40),
], ("webID", "aType", "timestamp", "counts")).withColumn(
  "timestamp", to_timestamp("timestamp")
)

我需要分组 webID 和插值 counts 间隔1分钟的值。但是,当我应用下面显示的代码时,

from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))), 
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _

df.groupBy("webID").apply(resample(df.schema, "60S")).show()

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o371.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 77 in stage 31.0 failed 4 times, most recent failure: Lost task 77.3 in stage 31.0 (TID 812, 27faa516aadb4c40b7d7586d7493143c0021c825663, executor 2): java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
w8f9ii69

w8f9ii691#

设置环境变量 ARROW_PRE_0_15_IPC_FORMAT=1 .
https://spark.apache.org/docs/3.0.0-preview/sql-pyspark-pandas-with-arrow.html#compatibiliy-pyarrow的设置--0150-and-spark-23x-24x

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))), 
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        import os                                      # add this line
        os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1'    # add this line
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _

相关问题