我试图用一个包含非原语类型的 Map(例如,Map[String,Set[String]])作为其缓冲区来实现Scala Spark Aggregator。我似乎可以使用kryo或ExpressionEncoder to encode a collection of primitives(例如,Set[String]),但当我将其嵌入Map中时,它似乎找不到编码器。
如何为这样的嵌套类型创建编码器?
我尝试过以下方法:
def bufferEncoder: Encoder[Map[String, Set[String]]] = Encoders.kryo[Map[String, Set[String]]]
字符串
和
def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])
型
对于我编写的另一个聚合器,我使用了
def bufferEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]
型
这是有效的。
但是当我尝试前两个选项时,我得到这个错误:
异常错误:未找到用于java.util.Map的编码器[字符串,数组[字符串]]
更新:
我添加了两个代码示例,尽可能的简单。唯一的区别是,第一个示例在Map中使用了Array,第二个示例使用了Set。第一个示例编译并运行(结果并不重要),但第二个示例出现了我在上面描述的异常。请注意,在这两个示例中,我都使用了可变的Scala Map和Set。
class MapSetTest extends Aggregator[String, Map[String, Set[String]], Int] with Serializable {
override def zero = Map[String, Set[String]]()
override def reduce(buffer: Map[String, Set[String]], newItem: String) = {
buffer.put(newItem, Set[String]() + newItem)
buffer
}
override def merge(b1: Map[String, Set[String]], b2: Map[String, Set[String]]) = {
b1
}
override def finish(reduction: Map[String, Set[String]]): Int = {
reduction.size
}
def bufferEncoder = implicitly[Encoder[Map[String, Set[String]]]]
def outputEncoder = Encoders.scalaInt
}
型
对比
class MapArrayTest extends Aggregator[String, Map[String, Array[String]], Int] with Serializable {
override def zero = Map[String, Array[String]]()
override def reduce(buffer: Map[String, Array[String]], newItem: String) = {
buffer.put(newItem, Array[String](newItem))
buffer
}
override def merge(b1: Map[String, Array[String]], b2: Map[String, Array[String]]) = {
b1
}
override def finish(reduction: Map[String, Array[String]]): Int = {
reduction.size
}
def bufferEncoder = implicitly[Encoder[Map[String, Array[String]]]]
def outputEncoder = Encoders.scalaInt
型
} }个文件夹
1条答案
按热度按时间dz6r00yl1#
虽然根据我的评论,我不会推荐kyro这段代码:
字符串
光靠它是不行的
1.不要使用ExpressionEncoder,只使用Encoder(ExpressionEncoder是一个实现细节)。
1.您必须导入sparkSession.implicits._以获取正确的编码器派生(使用Sparks内置编码器)。这些implicits绑定到Expression类型-而不是ExpressionEncoder
1.当你需要的时候,只考虑物化编码器,如果你需要的话,传递implementation,这将使你更容易交换你使用的编码器。
不幸的是,第2点并没有在入门页面中直接调用。
型
会工作,但是,正如你所发现的,不会
型