spark:函数上的过滤器没有按下

jaxagkaj  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(358)

我正在运行一个spark作业(scala)来将列转换为upper并过滤它的值

val udfExampleDF = spark.read.parquet("a.parquet")
udfExampleDF.filter(upper(col("device_type"))==="PHONE").select(col("device_type")).show()

我看到这个过滤器没有被按下。下面是实际计划的样子

== Physical Plan ==
CollectLimit 21
+- *(1) Filter (upper(device_type#138) = PHONE)
   +- *(1) FileScan parquet [device_type#138] Batched: true, Format: Parquet, Location: InMemoryFileIndex[a.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<orig_device_type:string>

但是,如果我只是过滤没有上限,过滤器被按下。
不知道为什么会这样。我假设这些过滤器在任何情况下都会被按下。谢谢你的帮助。

beq87vna

beq87vna1#

因为 predicate 下推尝试删除逻辑运算符并将它们推送到数据源,在我们的例子中是从 FilterFileScan parquet ,它必须使用原始列值。
从理论上讲,如果其他文件格式支持使用更改的列值进行过滤,则可能有效(不确定) PushDownPredicate 即使存在这样的文件格式,也支持此操作)。
要解决您的问题,请通过将动态值设置在等式的不同一侧来绕过此问题,尽管您有几个条件,但速度会快得多(尝试将更频繁的值设置在第一个条件中):

udfExampleDF.filter(col("device_type")==="Phone" or col("device_type")==="PHONE" or col("device_type")==="phone").select(col("device_type")).show()

您还可以将列值统一为 PHONE 在数据接收中,即在写入Parquet文件之前应用 upper 函数,然后按如下方式过滤:

udfExampleDF.filter(col("device_type")==="PHONE").select(col("device_type")).show()

相关问题