如何使用pyspark更新配置单元表中的记录?

rggaifut  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(355)

我们使用spark来处理大数据,最近得到了一个新的用例,我们需要使用spark更新配置单元表中的数据。
下面是一个简单的示例:数据驻留在hive表中,应用程序使用pyspark读取Dataframe(比如df1)。例如:数据框有以下列。
员工姓名年龄工资
1 aaaa 28 30000个
2 bbbb 38 20000美元
3中交26.25万
4 dddd 30 32000年
需要使用spark向表中添加更多记录。
前任:
行动:姓名年龄工资
加5 dddd 30 32000
应用程序可以通过剥离action列并附加到表中,将新数据读入第二个Dataframe(比如df2)。这是直截了当的,而且效果非常好。
df.write.format('parquet').mode('append').saveastable(规范配置单元表)
在某些情况下,我们需要删除现有记录或根据action列更新它们。
前任:
行动:姓名年龄工资
删除2 bbbb 38 20000
更新4 dddd 30 42000
在上面的示例中,应用程序需要删除empno:2 and 更新empno:4.
最终输出应如下所示:
员工姓名年龄工资
1 aaaa 28 30000个
3中交26.25万
4 dddd 30 42000美元
5 dddd 30 32000年
据我所知,更新操作在sparksql中不可用,而且Dataframe是不可变的,不能更改记录。
有人遇到过这种情况吗?或者知道使用pyspark更新配置单元表中现有记录的选项吗?
请注意:应用程序需要定期处理数百万条记录上的数千次更新。
提前谢谢。

w8ntj3qf

w8ntj3qf1#

在大多数情况下,你需要使用正确的工具和方法,并认识到任何限制;hadoop大体上是不可变的。
Dataframe可以以不同的模式保存,但不能选择性地更新记录—pyspark、scala、r或java与此无关,除了cloudera的kudu storage manager有一个Spark连接器,df编写器可以使用它,但当我上次使用它时,管理员不喜欢它的安全限制。
你能做什么?
使用kudu,这是可变的-但我怀疑不是一个选项。从我的经验来看,它就像 parquet 柱一样,性能不相上下。
不管是否使用df,使用orc文件而不是parquet来实现持久化,这些文件也是列式的,可以在脚本中使用配置单元的merge语句进行更新,也可以在启用配置单元支持的情况下使用sparksql进行更新。此选项意味着忘记Parquet地板。此链接提供了一些有趣的见解:https://www.quora.com/what-are-the-differences-between-orc-avro-and-parquet-file-formats-in-hadoop-in-terms-of-compression-and-speed
在df's和what not中执行操作并重新声明(即写入)所有数据(再次)并写入Parquet地板表/目录的两个版本之一(分区与否),并添加视图层以在当前和新版本视图之间切换。这是在没有使用兽人的时候完成的。
使用merge,可以压缩分区内的小hadoop配置单元文件,但前提是格式为orc—如果内存正常。我将要刷新我的记忆在这里,可能是它在较新的版本和api的变化。
此外:
所以,你有几个选择,一个不需要做所有的Spark与东风的。
databricks也有这种类型的delta表。
如果您使用的是从rdbms.s导入的jdbc,那么您可以以akward的方式将sqoop与外部表一起使用,以获取更新的数据,但我无法从问题中看出这一点。逻辑是这样的:sqoop merge key创建多个部件文件,而不是一个不能用于使用merge key的文件
有几件事要考虑。祝你好运。

相关问题