hbase批量加载附加数据,而不是覆盖它们

yyyllmsg  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(617)

实际上,我正在mapreduce和bulkload的帮助下将数据加载到hbase中,这是我用java实现的。所以基本上我创建了一个Map器并使用 HFileOutputFormat2.configureIncrementalLoad (问题末尾的完整代码)对于reduce,我使用了一个Map器,它只从文件中读入一些字节并创建一个put。把这个写出来 LoadIncrementalHFiles.doBulkLoad 将数据写入hbase。这一切都很有效。但可以肯定的是,当这样做时,它会覆盖hbase中的旧值。所以我正在寻找一种附加数据的方法,就像api works中的append函数一样。感谢阅读,希望你们中的一些人有一个想法可以帮助我:)

public int run(String[] args) throws Exception {
    int result=0;
    String outputPath = args[1];
    Configuration configuration = getConf();
    configuration.set("data.seperator", DATA_SEPERATOR);
    configuration.set("hbase.table.name",TABLE_NAME);
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
    configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);

    Job job = Job.getInstance(configuration);
    job.setJarByClass(HBaseBulkLoadDriver.class);
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapperClass(HBaseBulkLoadMapper.class);

    FileInputFormat.addInputPaths(job, args[0]);
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
    HFileOutputFormat2.setOutputPath(job,new Path((outputPath)));
    job.setMapOutputValueClass(Put.class);
    Connection c = ConnectionFactory.createConnection(configuration);
    Table t = c.getTable(TableName.valueOf(TABLE_NAME));
    RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME));
    HFileOutputFormat2.configureIncrementalLoad(job,t,rl);
    System.out.println("start");
    job.waitForCompletion(true);
    if (job.isSuccessful()) {
        HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
    } else {

        result = -1;
    }
    return result;
}

public static void doBulkLoad(String pathToHFile, String tableName) {
    try {
        Configuration configuration = new Configuration();
        configuration.set("mapreduce.child.java.opts", "-Xmx1g");
        HBaseConfiguration.addHbaseResources(configuration);
        LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);

        //HTable hTable = new HTable(configuration, tableName);
        //loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);

        Connection connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf(tableName));
        Admin admin = connection.getAdmin();
        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
        //path, admin, table, region locator
        loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator);

        System.out.println("Bulk Load Completed..");
    } catch(Exception exception) {
        exception.printStackTrace();
    }

按照注解中的要求,我在这里添加了表描述的输出,因为表是由pythonhappybase api创建的,我不确定api默认设置了什么optionflags。。。 {NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'} {NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}

p1tboqfb

p1tboqfb1#

在hfileoutputformat2.configureincrementalload()中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/hfileoutputformat2.html#line.408 减速器用作减速器。
在putsortreducer.reduce()中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/putsortreducer.html 键值存储在树集中,比较器只比较键值。这就是为什么只有一个值存在。
要保留这两个值,可以基于putsortreducer创建自己的reducer,在这里可以保留这两个值。然后设定:
hfileoutputformat2.配置incrementalload(job,t,rl);job.setreducerclass(myreducer.class);

相关问题