有没有一种方法可以将默认大小写值作为参数传递

gzjq41n4  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(362)

我想看看是否有一种方法可以将默认大小写的值合并到匹配表达式中

item match {
case null => false
case _:String => _.matches(params)
}

我试过寻找不同的选择,但我真的找不到解决办法。
对于context,这是代码片段的一部分,在 spark dataframe ```
x.filter(row => row.getAsString.matches("""/regex/""")).count()

但也有例外 `null` 价值观让一切都破碎了。
我知道我可以这样做来得到想要的结果,但是再计算一次就没有意义了

x.filter(row => row.getAsString match{
case null => false
case _ => row.getAsString.matches("""\d""")
}).count()

任何建议或建议都会有帮助。谢谢您
yvt65v4c

yvt65v4c1#

除了krzysztofł回答如下,您可以在匹配stament之前添加null检查,您可以执行以下操作:

x
.where($"colname".isNotNull)
.filter(row => row.getAs[String]("colname").matches("""/regex/""")).count()

或者类似的

x.where($"colname" rlike """\d""").count()
5jvtdoz2

5jvtdoz22#

你可以用rlike。例如:

x.filter(x("colName").rlike("cat|dog")).count
sxissh06

sxissh063#

要匹配所有内容,只需使用不带类型归属的变量名:

item match {
   case null => false
   case i => i.matches(params)
}

所以在你的情况下:

x.filter(row => 
   row.getAs[String]("colname") match {
         case null => false
         case r => r.matches("""\d""")
   }
).count()

如果要检查该项是否为null,还可以使用 Option.apply 它回来了 None 如果值为 null 以及 Some 否则,例如:

Option(item).fold(false)(_.matches(params))
o7jaxewo

o7jaxewo4#

如果我正确理解了您的需求,那么您希望在单个迭代中进行过滤和聚合,我认为udaf是您必须要做的事情。基本上,我不认为这样做比在你的原始文章的最后提到的选项,即自定义项过滤掉,然后计数有什么性能优势。但是,我已经演示了如何在这里使用udaf。
对于spark<3.0,您必须扩展userdefinedaggregatefunction类,使其看起来像这样-

import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._

    class ConditionalCount(val pattern: String, colName: String) extends UserDefinedAggregateFunction{
      override def inputSchema: StructType = StructType(Seq(StructField(colName, StringType)))

      override def bufferSchema: StructType = StructType(Seq(StructField("count", LongType)))

      override def dataType: DataType = LongType

      override def deterministic: Boolean = true

      override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = 0L

      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getAs[Long](0) + input.getAs[String](0) match {
          case null => 0
          case x if x.matches(pattern) => 1L
          case _ => 0L
        }
      }

      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit =
        buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)

      override def evaluate(buffer: Row): Any = buffer.getAs[Long](0)
    }

然后使用这个初始化并使用这个udaf-

val conditionalCount = new ConditionalCount("my pattern", "colName")

    df.withColumn("count", conditionalCount(col("colName")))

p、 s:我没有运行这个并验证,如果有小错误请原谅

相关问题