spark predicate 下推在日期上不起作用

ykejflvf  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(392)

我只是在读取一个Parquet文件,并添加一个过滤器,以匹配所有的记录,落在日期-这里2021-04-03。列不应为null,并且应在给定的日期。
输入表

+---------+-----------+-------------------+
|      lat|        lng|       eventDTLocal|
+---------+-----------+-------------------+
|34.269788| -98.239543|2021-04-03 19:18:58|
|29.780977| -95.749744|2021-04-03 19:33:24|
|48.150173|-122.191903|2021-04-03 17:25:00|
|40.652889| -74.185461|2021-04-03 20:27:55|
|41.747148| -87.799557|2021-04-03 19:52:39|
+---------+-----------+-------------------+

到目前为止,我已经尝试过强制转换列,使用substring\u index函数进行匹配,但是我无法在推送的过滤器中获得它。
以下是我尝试的代码:

df1 = spark.read.parquet("/Users/aadhithyahari/Downloads/awsfiles/part-00000-bfccec4c-7939-4f85-8fa9-5f1cb34f843a.c000.snappy.parquet") \
        .select( 'lat', 'lng', 'eventDTLocal').filter("TO_DATE(CAST(UNIX_TIMESTAMP(`eventDTLocal`, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP),'yyyy-MM-dd') == CAST('2021-04-03' AS DATE)").explain(extended=True)

过滤器仅在数据过滤器中列出,而不在其他任何地方列出。我错过了什么?

3zwjbxry

3zwjbxry1#

并不是所有的过滤器都能被按下。一般来说,大多数包含函数调用的过滤器 substring 或者 unix_timestamp 不能向下推。在datasourcestrategy中实现了过滤器被下推的完整逻辑。
在这种情况下,解决此限制的一种方法是存储 eventDTLocal 作为unix时间戳而不是parquet文件中的字符串,然后按特定的毫秒进行过滤。


# create some test data

data = [(52.5151923, 13.3824107, 1618760421000), 
        (1.0, 1.0, 1)]
spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
    .write.mode("overwrite").parquet("dataWithUnixTime")

# get the first and last millisecond of the day

# the timezone has probably to be adjusted

from datetime import datetime, timezone
dt = datetime(2021, 4, 18)
start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
end = start + 24 * 60 * 60 * 1000 - 1

# run the query

df = spark.read.parquet("dataWithUnixTime") \
    .filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")

物理平面图 df ```
== Physical Plan ==

  • (1) Project [lat#9, lng#10, eventDTLocal#11L]

+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
+- *(1) ColumnarToRow
+- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: structlat:double,lng:double,eventDTLocal:bigint

现在包含推送的过滤器 `GreaterThanOrEqual` 以及 `LessThanOrEqual` 对于日期列。

相关问题