在spark scala中的left join上未获得所需的输出

tzdcorbm  于 7个月前  发布在  Apache
关注(0)|答案(1)|浏览(80)

我有两个数据集[ReconEntity],其中ReconEntity是:

case class ReconEntity(rowId: String,
                       groupId: String,
                       amounts: List[Amount],
                       processingDate: Long,
                       attributes: Map[String, String],
                       entityType: String,
                       isDuplicate: String)

字符串
第一个数据集看起来像:

+-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C1|     G1|USD,10.00000000...|    1551021334|  rowId,C1|      false|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     |   C3|     G2|USD,6.000000000...|    1551011017|  rowId,C3|       true|
     +-----+-------+------------------+--------------+----------+-----------+


第二个数据集看起来像:

+-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     +-----+-------+------------------+--------------+----------+-----------+


我希望结果看起来像这样:

+-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C1|     G1|USD,10.00000000...|    1551021334|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     |   C3|     G2|USD,6.000000000...|    1551011017|  rowId,C3|       true|
     +-----+-------+------------------+--------------+----------+-----------+


我使用左连接连接两个数据集,如果第二个数据集中不存在rowId,我将isDuplicate标志的值标记为true,否则将结果数据集中的原始值标记为true。逻辑是:

inputEntries.as("inputDataset").join(otherEntries.as("otherDataset"),
      col("inputDataset.rowId") === col("otherDataset.rowId"), "left")
      .select(
        col("inputDataset.rowId"),
        col("inputDataset.groupId"),
        col("inputDataset.amounts"),
        col("inputDataset.processingDate"),
        col("inputDataset.attributes"),
        col("inputDataset.entityType"),
        when(
          col("otherDataset.rowId").isNull, TRUE
        ).otherwise(col("inputDataset.isDuplicate")).as(IS_DUPLICATE)
      ).as[ReconEntity]


这里的joinKey是rowId。这个逻辑在本地工作得很好,但是当我试图运行spark作业时,结果并不像预期的那样。我不太熟悉连接,想知道我的逻辑是否正确。2个数据集的左连接的输出是什么。

wwtsj6pe

wwtsj6pe1#

对于给定的要求,左联接将是正确的选择。
你能分享一下你在spark-job输出中看到的差异吗?当前解决方案的唯一问题是当你在第二个数据集中有重复时(其他情况下)。在这种情况下,第一个数据集中的每一行都会与重复的行匹配,从而创建多个条目。
由于我们只需要检查rowId列,因此可以使用deduplicate操作来仅保留第二个数据集中的唯一行,以避免连接后的重复。
下面是一个工作代码,包括重复数据消除和重复数据消除前后的连接输出示例。

import org.apache.spark.sql.Dataset
import spark.implicits._

// Class definitions
case class Amount(currency: String, value: Double)
case class ReconEntity(
  rowId: String,
  groupId: String,
  amounts: List[Amount],
  processingDate: Long,
  attributes: Map[String, String],
  entityType: String,
  isDuplicate: String
)

// First Dataset
val inputEntries: Dataset[ReconEntity] = Seq(
    ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551021334, Map("rowId" -> "C1"), "E1", "false"),
    ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
    ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
    ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
    ReconEntity("C2", "G2", List(Amount("USD", 2.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C2"), "E1", "false"),
    ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011459, Map("rowId" -> "C3"), "E1", "false"),
    ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C3"), "E1", "true")
).toDS()
// Second Dataset (includes a duplicate for "C3")
val otherEntries: Dataset[ReconEntity] = Seq(
    ReconEntity("C2", "G2", List(Amount("USD", 2.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C2"), "E1", "false"),
    ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011459, Map("rowId" -> "C3"), "E1", "false"),
    ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011460, Map("rowId" -> "C3"), "E1", "false")
).toDS()

// Sample Output with left join
inputEntries.join(otherEntries, Seq("rowId"), "left").show()

// Due to duplicates in second dataset, there are 4 rows for "C3" instead of 2 expected
// +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
// |rowId|groupId|             amounts|processingDate|   attributes|entityType|isDuplicate|groupId|             amounts|processingDate|   attributes|entityType|isDuplicate|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551021334|{rowId -> C1}|        E1|      false|   null|                null|          null|         null|      null|       null|
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|   null|                null|          null|         null|      null|       null|
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|   null|                null|          null|         null|      null|       null|
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|   null|                null|          null|         null|      null|       null|
// |   C2|     G2|[{USD, 2.0}, {EUR...|    1551011017|{rowId -> C2}|        E1|      false|     G2|[{USD, 2.0}, {EUR...|    1551011017|{rowId -> C2}|        E1|      false|
// |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|     G2|[{USD, 6.0}, {EUR...|    1551011460|{rowId -> C3}|        E1|      false|
// |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|
// |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011017|{rowId -> C3}|        E1|       true|     G2|[{USD, 6.0}, {EUR...|    1551011460|{rowId -> C3}|        E1|      false|
// |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011017|{rowId -> C3}|        E1|       true|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+

// Create a result with de-duplication of second dataset on join key (rowId)
val result = inputEntries.as("inputDataset").join(
    otherEntries.dropDuplicates("rowId").as("otherDataset"),
    Seq("rowId"), 
    "left"
    ).select(
        col("inputDataset.rowId"),
        col("inputDataset.groupId"),
        col("inputDataset.amounts"),
        col("inputDataset.processingDate"),
        col("inputDataset.attributes"),
        col("inputDataset.entityType"),
        when(
          col("otherDataset.rowId").isNull, lit("true")
        ).otherwise(col("inputDataset.isDuplicate")).as("isDuplicate")
      ).as[ReconEntity]

// Output
result.show()
// +-----+-------+--------------------+--------------+-------------+----------+-----------+
// |rowId|groupId|             amounts|processingDate|   attributes|entityType|isDuplicate|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551021334|{rowId -> C1}|        E1|       true|
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|
// |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|
// |   C2|     G2|[{USD, 2.0}, {EUR...|    1551011017|{rowId -> C2}|        E1|      false|
// |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|
// |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011017|{rowId -> C3}|        E1|       true|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+

字符串

相关问题