scala 如何在Spark聚合器中使用嵌套Map作为缓冲区?

dxpyg8gm  于 5个月前  发布在  Scala
关注(0)|答案(1)|浏览(65)

我试图用一个包含非原语类型的 Map(例如,Map[String,Set[String]])作为其缓冲区来实现Scala Spark Aggregator。我似乎可以使用kryo或ExpressionEncoder to encode a collection of primitives(例如,Set[String]),但当我将其嵌入Map中时,它似乎找不到编码器。
如何为这样的嵌套类型创建编码器?
我尝试过以下方法:

def bufferEncoder: Encoder[Map[String, Set[String]]] = Encoders.kryo[Map[String, Set[String]]]

字符串

def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])


对于我编写的另一个聚合器,我使用了

def bufferEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]


这是有效的。
但是当我尝试前两个选项时,我得到这个错误:
异常错误:未找到用于java.util.Map的编码器[字符串,数组[字符串]]

更新:

我添加了两个代码示例,尽可能的简单。唯一的区别是,第一个示例在Map中使用了Array,第二个示例使用了Set。第一个示例编译并运行(结果并不重要),但第二个示例出现了我在上面描述的异常。请注意,在这两个示例中,我都使用了可变的Scala Map和Set。

class MapSetTest extends Aggregator[String, Map[String, Set[String]], Int] with Serializable {

override def zero = Map[String, Set[String]]()

override def reduce(buffer: Map[String, Set[String]], newItem: String) = {
    buffer.put(newItem, Set[String]() + newItem)
    buffer
}

override def merge(b1: Map[String, Set[String]], b2: Map[String, Set[String]]) = {
  b1
}

override def finish(reduction: Map[String, Set[String]]): Int = {
  reduction.size
}

def bufferEncoder = implicitly[Encoder[Map[String, Set[String]]]]
def outputEncoder = Encoders.scalaInt
}

对比

class MapArrayTest extends Aggregator[String, Map[String, Array[String]], Int] with Serializable {

override def zero = Map[String, Array[String]]()

override def reduce(buffer: Map[String, Array[String]], newItem: String) = {
  buffer.put(newItem, Array[String](newItem))
  buffer
}

override def merge(b1: Map[String, Array[String]], b2: Map[String, Array[String]]) = {
  b1
}

override def finish(reduction: Map[String, Array[String]]): Int = {
  reduction.size
}

def bufferEncoder = implicitly[Encoder[Map[String, Array[String]]]]

def outputEncoder = Encoders.scalaInt


} }个文件夹

dz6r00yl

dz6r00yl1#

虽然根据我的评论,我不会推荐kyro这段代码:

def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])

字符串
光靠它是不行的
1.不要使用ExpressionEncoder,只使用Encoder(ExpressionEncoder是一个实现细节)。
1.您必须导入sparkSession.implicits._以获取正确的编码器派生(使用Sparks内置编码器)。这些implicits绑定到Expression类型-而不是ExpressionEncoder
1.当你需要的时候,只考虑物化编码器,如果你需要的话,传递implementation,这将使你更容易交换你使用的编码器。
不幸的是,第2点并没有在入门页面中直接调用。

import sparkSession.implicits._
val enc = implicitly[Encoder[Map[String,Set[String]]]]


会工作,但是,正如你所发现的,不会

import sparkSession.implicits._
val enc = implicitly[ExpressionEncoder[Map[String,Set[String]]]]

相关问题