如何在scala spark中按元组数据集的一个元素对元组数据集进行分组?

kcwpcxri  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(445)

我将Parquet文件读入元组数据集:

val dataset = spark.read.parquet("some-path").as[Tuple2[KeyClass, ValueClass]](Encoders.kryo)

我可以看到它与:

import spark.implicits._
dataset.map(x => s"$x._1 : $x._2").show(false)
+----------------------------+
|value                       |
+----------------------------+
|(1, 2)                      |
|(1, 3)                      |
|(2, 3)                      |
+----------------------------+

我的keyclass和valueclass实际上是带有其他嵌套类的复杂类(我不能在这里发布exect类和show方法的结果,因为代码是专有的,但下面是它们的结构):
密钥类:

public class KeyClass implements WritableComparable<KeyClass> {
    private byte[] field2;
    private byte[] field3;

    ...

    public boolean equals(Object o) {...}
    public int hashCode() {...}
}

值类:

public class ValueClass implements Writable {
    private OtherClass1 field1;
    private OtherClass2 field2;
    private boolean field3;
    private Long field4;
}

我需要按元组中的一个元素对它进行分组:

+----------------------------+
|value                       |
+----------------------------+
|(1, [2, 3])                 |
|(2, 3)                      |
+----------------------------+

我试过:

val value1 = dataset.groupByKey(_._1)(Encoders.kryo)
val value2 = value1.mapValues(_._2)(Encoders.bean(classOf[ValueClass]))
val value3 = value2.mapGroups({case (key, value) => (key, value.toList)})

value1.mapGroups((a, b) => (a, b.map(_._2))).show(false)

我在使用mapgroups方法的行上收到以下异常的异常:

Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience

我还尝试向该方法添加其他编码器:

val value3 = value2.mapGroups((a, b) => (a, b.toArray))(Encoders.tuple(Encoders.bean(classOf[KeyClass]), Encoders.bean(classOf[Array[ValueClass]])))

然后我在同一个方法上看到不同的异常:

Exception in thread "main" java.lang.AssertionError: assertion failed
xqkwcwgp

xqkwcwgp1#

groupby操作正确,问题是编码器的使用。我的自定义类是从另一个项目的遗留代码导入的,它们没有正确的编码器,也不遵守为它们创建bean编码器的规则。所以我只需要为每个操作明确指定正确的编码器:

dataset
  .groupByKey(_._1)(Encoders.kryo)
  .mapGroups((a,b) => (a, b.map(_._2).toList))(Encoders.kryo)

请指出,如果我对编码器的解释是含糊不清或不正确的。

相关问题