Spark结构delta流的下推滤波器

fykwrbwg  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(289)

我有一个用例,我们需要将开放源代码的delta表流化到多个查询中,在其中一个分区列上进行过滤。如,。给定按年份列分区的增量表。

Streaming query 1
spark.readStream.format("delta").load("/tmp/delta-table/").
where("year= 2013")

Streaming query 2
spark.readStream.format("delta").load("/tmp/delta-table/").
where("year= 2014")

流式传输后,物理计划显示过滤器。

> == Physical Plan == Filter (isnotnull(year#431) AND (year#431 = 2013))
> +- StreamingRelation delta, []

我的问题是pushdown predicate 是否适用于delta中的流式查询?我们能从delta流式传输特定的分区吗?

plupiseo

plupiseo1#

如果列已经分区,则只扫描所需的分区。
让我们创建分区和非分区增量表并执行结构化流。
分区增量表流:

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._

//sample dataframe
val df = Seq((1,2020),(2,2021),(3,2020),(4,2020),
(5,2020),(6,2020),(7,2019),(8,2019),(9,2018),(10,2020)).toDF("id","year")

//partionBy year column and save as delta table
df.write.format("delta").partitionBy("year").save("delta-stream")

//streaming delta table
spark.readStream.format("delta").load("delta-stream")
.where('year===2020)
.writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:请注意partitionfilters

非分区增量表流:

df.write.format("delta").save("delta-stream")

spark.readStream.format("delta").load("delta-stream")
    .where('year===2020)
    .writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:请注意pushedfilters

相关问题