使用java api加入spark sql

ma8fv8wu  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(350)

我有3个数据集来自3个表:

Dataset<TABLE1> bbdd_one = map.get("TABLE1").as(Encoders.bean(TABLE1.class)).alias("TABLE1");
Dataset<TABLE2> bbdd_two = map.get("TABLE2").as(Encoders.bean(TABLE2.class)).alias("TABLE2");
Dataset<TABLE3> bbdd_three = map.get("TABLE3").as(Encoders.bean(TABLE3.class)).alias("TABLE3");

我想对它做一个三重左连接,然后把它写进一个output.parquet
sql join语句类似于:

SELECT one.field, ........, two.field ....., three.field, ... four.field
FROM TABLE1 one
LEFT JOIN TABLE2 two ON two.field = one.field
LEFT JOIN TABLE3 three ON three.field = one.field AND three.field = one.field
LEFT JOIN TABLE3 four ON four.field = one.field AND four.field = one.otherfield
WHERE one.field = 'whatever'

如何使用javaapi实现这一点?有可能吗?我做了一个例子,只有一个连接,但与3似乎很难。
ps:我与java api的另一个连接是:

Dataset<TJOINED> ds_joined = ds_table1
                        .join(ds_table2,
                                JavaConversions.asScalaBuffer(Arrays.asList("fieldInCommon1", "fieldInCommon2", "fieldInCommon3", "fieldInCommon4"))
                                        .seq(),
                                "inner")
                        .select("a lot of fields", ... "more fields")                                                               
                        .as(Encoders.bean(TJOINED.class));

谢谢!

q8l4jmvw

q8l4jmvw1#

你试过链接join语句吗?我不经常用java编写代码,所以这只是一个猜测

Dataset<TJOINED> ds_joined = ds_table1
    .join(
        ds_table2,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .join(
        ds_table3,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .join(
        ds_table4,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .select(...)
    .as(Encoders.bean(TJOINED.class))

更新:如果我的理解是正确的, ds_table3 以及 ds_table4 是相同的,它们在不同的域上连接。也许这个更新的答案,在scala中给出,因为我已经习惯了使用它,可能会实现你想要的。下面是完整的工作示例:

import spark.implicits._

case class TABLE1(f1: Int, f2: Int, f3: Int, f4: Int, f5:Int)
case class TABLE2(f1: Int, f2: Int, vTable2: Int)
case class TABLE3(f3: Int, f4: Int, vTable3: Int)

val one = spark.createDataset[TABLE1](Seq(TABLE1(1,2,3,4,5), TABLE1(1,3,4,5,6)))
//one.show()
//+---+---+---+---+---+
//| f1| f2| f3| f4| f5|
//+---+---+---+---+---+
//|  1|  2|  3|  4|  5|
//|  1|  3|  4|  5|  6|
//+---+---+---+---+---+

val two = spark.createDataset[TABLE2](Seq(TABLE2(1,2,20)))
//two.show()
//+---+---+-------+
//| f1| f2|vTable2|
//+---+---+-------+
//|  1|  2|     20|
//+---+---+-------+

val three = spark.createDataset[TABLE3](Seq(TABLE3(3,4,20), TABLE3(3,5,50)))
//three.show()
//+---+---+-------+
//| f3| f4|vTable3|
//+---+---+-------+
//|  3|  4|     20|
//|  3|  5|     50|
//+---+---+-------+

val result = one
.join(two, Seq("f1", "f2"), "left")
.join(three, Seq("f3", "f4"), "left")
.join(
  three.withColumnRenamed("f4", "f5").withColumnRenamed("vTable3", "vTable4"),
  Seq("f3", "f5"),
  "left"
)
//result.show()
//+---+---+---+---+---+-------+-------+-------+
//| f3| f5| f4| f1| f2|vTable2|vTable3|vTable4|
//+---+---+---+---+---+-------+-------+-------+
//|  3|  5|  4|  1|  2|     20|     20|     50|
//|  4|  6|  5|  1|  3|   null|   null|   null|
//+---+---+---+---+---+-------+-------+-------+

相关问题