打开多个hbase表

bihw5rsg  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(292)

我是一个新的spark用户,我想将流数据保存到多个hbase表中。当我想把我的数据保存在一个文件中时,我没有遇到任何问题,但是我无法处理多个文件。
我曾尝试创建多个htable,但后来发现这个类只用于与单个hbase表通信。
有什么办法吗?
这就是我尝试创建多个htable的地方(当然不起作用,但这是我的想法)

//HBASE Tables
val tableFull = "table1"
val tableCategoricalFiltered = "table2"

// Add local HBase conf
val conf1 = HBaseConfiguration.create()
val conf2 = HBaseConfiguration.create()

conf1.set(TableInputFormat.INPUT_TABLE, tableFull)
conf2.set(TableInputFormat.INPUT_TABLE, tableCategoricalFiltered)

//Opening Tables
val tableInputFeatures = new HTable(conf1, tableFull)
val tableCategoricalFilteredFeatures = new HTable(conf2, tableCategoricalFiltered)

这里是我尝试使用它们的地方(尽管有一个htable工作)

events.foreachRDD { event =>

    var j = 0
    event.foreach { feature =>

            if ( j <= 49 ) {
                    println("Feature " + j + " : " + featuresDic(j))
                    println(feature)

                    val p_full = new Put(new String("stream " + row_full).getBytes())
                    p_full.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                    tableInputFeatures.put(p_full)

                    if ( j != 26 || j != 27 || j != 28 || j != 29 ) {

                            val p_cat = new Put(new String("stream " + row_categorical).getBytes())
                            p_cat.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                            tableCategoricalFilteredFeatures.put(p_cat)
                    }else{
                            j = 0
                            row_full = row_full + 1

                            println("Feature " + j + " : " + featuresDic(j))
                            println(feature)

                            val p_full = new Put(new String("stream " + row_full).getBytes())
                            p_full.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                            tableInputFeatures.put(p_full)

                            val p_cat = new Put(new String("stream " + row_categorical).getBytes())
                            p_cat.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                            tableCategoricalFilteredFeatures.put(p_cat)
                    }

                    j  = j + 1
            }
    }
pw136qt2

pw136qt21#

有一种方法我确认效果很好,使用hbase rdd库。https://github.com/unicredit/hbase-rdd
它很容易使用。请参考https://github.com/unicredit/hbase-rdd#writing-到hbase查看用法。
您可以尝试使用multitableoutputformat,因为我已经确认它可以很好地与传统的mapreduce配合使用。我还没用spark的。

相关问题