如何通过spark作业向hbase发送删除查询

x33g5p2x  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(512)

我有一个自动化sparksql作业的用例,我想在其中执行以下操作:
使用spark从phoenix读取一个表(我们称之为table1),并在Dataframe(我们称之为df1)中收集找到的所有负值
然后,我想从另一个表(表2)中删除记录,其中一列的值在df1中(我考虑过使用连接查询,但我想知道是否可以使用dataframe,以及是否有使用hbase和spark dataframes的api)
afaik phoenix不直接支持通过spark进行删除操作(如果我错了,请纠正我,如果有什么方法我很乐意听到),这就是为什么我更倾向于使用hbase spark api的原因
下面是一个更直观地解释的模式:

这里有一些代码。
在Dataframe中收集负值:

// Collect negative values
val negativeValues = spark
  .sqlContext
  .phoenixTableAsDataFrame("phoenix.table1", Seq(), conf = hbaseConf)
  .select('COLUMN1)
  .where('COLUMN2.lt(0))

// Send the query
[...]

从表2中删除列1在negativevalues中的值,因此在sql中类似于这样(如果可以将in直接应用于df):

DELETE FROM table2 WHERE COLUMN1 IN negativeValues

我的预期结果是:

table1

column1 |   column2
        |
123456  |   123
234567  |   456
345678  |   -789
456789  |   012
567891  |   -123

table2

column1 |   column2
        |
123456  |   321
234567  |   654
345678  |   945 <---- same column1 as table1's, so delete
456789  |   987
567891  |   675 <---- same column1 as table1's, so delete

最后,我想知道是否有一种方法可以通过spark将删除请求发送到hbase,而不必太麻烦。
谢谢您。

vxf3dgd4

vxf3dgd41#

如果需要从spark通过phoenix(sql引擎)向hbase运行“delete”查询,则必须创建自定义api。
可以使用以下方法,
从源dataframe获取table2 rowkey列以进行删除(在table2上)。
构造对源Dataframe的每个分区进行操作的代码,并构建“delete”查询。假设查询为“delete from table2 where column1=?”,准备它,并以正确的批量大小执行它。因为我们在Dataframe的每个分区上并行执行它,所以源Dataframe中的分区数驱动了并行性。因此,您可以尝试使用适当的大小对其进行重新分区,以查看适当的性能数据。
如果选择跳过sql引擎,还可以使用spark hbase direct api。这里有一个这样的例子-https://github.com/tmalaska/sparkonhbase/blob/master/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/hbasebulkdeleteexample.scala

相关问题