scala 获取错误:联合只能在连接2个数据集时对具有相同列数的表执行

pxy2qtax  于 6个月前  发布在  Scala
关注(0)|答案(1)|浏览(95)

我有两个数据集[ReconEntity],其中ReconEntity是:

case class ReconEntity(rowId: String,
                       groupId: String,
                       amounts: List[Amount],
                       processingDate: Long,
                       attributes: Map[String, String],
                       entityType: String,
                       isDuplicate: String)

字符串
第一个数据集看起来像:

+-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C1|     G1|USD,10.00000000...|    1551021334|  rowId,C1|      false|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     |   C3|     G2|USD,6.000000000...|    1551011017|  rowId,C3|       true|
     +-----+-------+------------------+--------------+----------+-----------+


第二个数据集看起来像:

+-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     +-----+-------+------------------+--------------+----------+-----------+


我希望结果看起来像这样:

+-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C1|     G1|USD,10.00000000...|    1551021334|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     |   C3|     G2|USD,6.000000000...|    1551011017|  rowId,C3|       true|
     +-----+-------+------------------+--------------+----------+-----------+


我使用左连接连接两个数据集,如果第二个数据集中不存在rowId,我将isDuplicate标志的值标记为false,否则将结果数据集中的原始值标记为false。逻辑是:

inputEntries.as("inputDataset").join(otherEntries.as("otherDataset")
      .select(joinKey),
      Seq(joinKey), "left")
      .withColumn("isDuplicateFinal", when(col("otherDataset.rowId").isNull, TRUE).otherwise(col("inputDataset.isDuplicate")))
      .drop("otherDataset.isDuplicate")
      .select(
                col("inputDataset.rowId"),
                col("inputDataset.groupId"),
                col("inputDataset.amounts"),
                col("inputDataset.processingDate"),
                col("inputDataset.attributes"),
                col("inputDataset.entityType"),
                col("isDuplicateFinal").alias("isDuplicate")
              ).as[ReconEntity]


这里的joinKey是rowId。这个逻辑在本地工作得很好,但是当我试图运行spark作业时,它失败了,原因是:

ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 8 columns;
'Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526, isDuplicateFinal#7810]
+- 'Project [rowId#4188, rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526, CASE WHEN isnull(rowId#4188) THEN true ELSE isDuplicate#526 END AS isDuplicateFinal#7810]
   +- 'Project [rowId#4188, rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526]
      +- 'Join LeftOuter, (rowId#247 = rowId#4188)
         :- SubqueryAlias inputDataset
         :  +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526]
         :     +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, CASE WHEN (row_number#517 = 1) THEN false ELSE true END AS isDuplicate#526, row_number#517]
         :        +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#253, row_number#517]
         :           +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#253, row_number#517, row_number#517]
         :              +- Window [row_number() windowspecdefinition(rowId#247, processingDate#250L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#517], [rowId#247], [processingDate#250L DESC NULLS LAST]
         :                 +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#253]


无法理解UNION是否在执行as[ReconEntity]或任何其他操作时发生,以及正在生成新列,因为我在加入2个数据集后下降。

fjaof16o

fjaof16o1#

即使我不确定UNION在这段代码中发生的位置。但是,.select(joinKey)是必需的吗?你能试着删除它运行一次吗?

相关问题