scala 检查需要较长时间才能完成的列

z18hc3ub  于 6个月前  发布在  Scala
关注(0)|答案(3)|浏览(69)

需要检查空的框架列和更新一个框架列在有效的方式。
因为“前纹章”是-
x1c 0d1x的数据
对每一列(Column 1到Column 20)进行空值检查并更新error_notes,如下所示-

val df1 = data.withColumn("error_notes", when(col(column1).isNull, concat_WS("!", Column1 is null, col("error_note")).otherwise("error_notes"))

val df2 = df1.withColumn("error_notes", when(col(column2).isNull, concat_WS("!", Column2 is null, col("error_notes")).otherwise("error_notes"))

val df3 = df2.withColumn("error_notes", when(col(column3).isNull, concat_WS("!", Column3 is null, col("error_notes")).otherwise("error_notes"))

.
.
.
.

val df20 = df19.withColumn("error_notes", when(col(column20).isNull, concat_WS("!", Column20 is null, col("error_notes")).otherwise("error_notes"))

字符串
当对所有列执行空检查并更新error_note列时,由于字符串大小太大,需要更长的时间(几乎4小时)才能完成。有没有什么有效和高性能的方法来解决这个问题。

tktrz96b

tktrz96b1#

如果你有一个pk的框架,也许你可以使用join来加速。像这样:

val df1 = data.select("pk", "column1")
val df2 = df1.where("column1 is null")
val df3 = df2.withColumn("error_notes", concat_WS("!", Column1 is null, col("error_note")))
val df4 = df2.join(df3, Seq("pk"))

字符串

kninwzqo

kninwzqo2#

可以准备所有列的检查,并且方法withColumn仅使用一次:

val columnsToCheck = Seq("column1", "column2", "column3")
val df = Seq(
  (Some(1), Some(2), Some(3)),
  (Some(4), None, None)
).toDF(columnsToCheck: _*)

val errorMessages = columnsToCheck.map(colName =>
  when(col(colName).isNull, lit(colName + " is null")).otherwise(null)
)

val result = df.withColumn("error_note", concat_ws("!", errorMessages: _*))

字符串
结果是:

+-------+-------+-------+-------------------------------+
|column1|column2|column3|error_note                     |
+-------+-------+-------+-------------------------------+
|1      |2      |3      |                               |
|4      |null   |null   |column2 is null!column3 is null|
+-------+-------+-------+-------------------------------+

iovurdzv

iovurdzv3#

不要使用withColumn,这是速度变慢的主要原因,你还应该避免使用字符串,而是使用专用的状态字段(参见Quality's storage model以获得更少浪费和更快的示例)。
我在pasha701之外添加这个的唯一原因是,你会发现你可以添加的when子句的数量有一个上限,它们将被编译成一个单一的函数,这个函数无法逃脱每个spark版本上64k代码大小的限制。一些版本会直接抛出janino错误,而另一些版本则会直接抛出janino错误,以寻求优化。这样的规则直接处理了这个问题。YMMV和pasha701的响应在其他方面符合要求。

相关问题