spark scala,用savemode.append写入数据,同时覆盖一些现有分区

nkkqxpd9  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(729)

在我的spark程序中,我有以下命令

df.write
  .mode(SaveMode.Append)
  .partitionBy("year","month","day")
  .format(format)
  .option("path",path)
  .saveAsTable(table_name)

当我在同一天运行它两次时,我的数据中有重复项。所以我希望它附加数据,但是当一些分区已经存在时,它应该覆盖它们。

xxls0lw8

xxls0lw81#

这里使用的是hive集成,但只能是spark目录,一个完整的示例:
1) 需要摆table吗

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
import org.apache.spark.sql.types._

val df = spark.range(9).map(x => (x, (x + 100) % 3)).toDF("c1", "c2")
df.repartition($"c2")
  .write
  .partitionBy("c2")
  .mode("overwrite").saveAsTable("tabX")

2) 在安装之后用这种方式更新一个分区

val df2 = spark.range(1).map(x => (x, (x + 100) % 3)).toDF("c1", "c2")
df2.repartition($"c2")
   .write
   .mode("overwrite").insertInto("tabX")

3) 看看效果:

// from 9 -> 7 entries, pls run
val df3 = spark.table("tabX")
df3.show(false)

退货:

+---+---+
|c1 |c2 |
+---+---+
|2  |0  |
|5  |0  |
|8  |0  |
|1  |2  |
|4  |2  |
|7  |2  |
|0  |1  |
+---+---+

这是分区覆盖的证据。

相关问题