apache spark自定义日志未过滤数据(lazylogging)

rwqw0loc  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(284)

我正在过滤一个列以符合一些验证,我可以使用spark内置函数进行过滤,但是我需要用适当的消息记录无效的数据(我正在使用lazylogging),有没有什么方法不用自定义udf就可以做到这一点,这样我就可以保持spark优化?
例如,筛选短于20个字符的名称:

df.filter(length($"name") <= lit(20))

在这种情况下,如果没有自定义自定义自定义项,如何记录超过20个字符的名称?

nafvub8i

nafvub8i1#

如果筛选操作的结果不太大,不适合您的驱动程序,您可以收集结果并将其打印到默认记录器。

val logCollection = df.filter(length($"name") > lit(20)).collectAsList
logCollection.foreach(logger.info(_))

作为替代方法,您可以通过应用另一种writestream格式来创建一个单独的流,以便将名称写入数据库、控制台等。请记住,这样做时,您实际上会在sparksession中创建多个流查询,这些查询将独立地使用数据:

val originalDf = df.[...]
val logDf = df.filter(length($"name") > lit(20))

val originalQuery = originalDf.writeStream.[...].start() // keep logic as is
val logQuery = logDf.writeStream.format("console").[...].start()

spark.streams.awaitAnyTermination()

相关问题