Hbase的Java操作

x33g5p2x  于2021-03-14 发布在 Hbase  
字(10.7k)|赞(0)|评价(0)|浏览(503)

[warning] 实验环境:Hbase集群+IDEA代码调试

HBase 与 Hadoop 一样,都是用 java 编写的,所以 HBase 对 java 支持是必需的,下面看看怎么使用java 语言对 HBase 进行操作。Java API 核心类总结如下:

[info] 1、 HBaseConfiguration 类

HBaseConfiguration 是每一个 HBase Client 都会使用到的对象,它代表 HBase 配置信息。有两种构造方式。

public HBaseConfiguration()
public HBaseConfiguration(final Configuration c)

默认构造方式会尝试从 hbase-default.xml 和 HBase-site.xml 文件中读取配置。如果CLASSPATH 没有这两个文件,就需要自己配置。

Configuration HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.ZooKeeper.quorum","zkServer");  //hbase 服务地址
HBASE_CONFIG.set("hbase.ZooKeeper.property.clientPort","2181"); //端口号
HBaseConfiguration cfg = new HBaseConfiguration(HBASE_CONFIG);  //读取配置文件

[info] 2、创建表

创建表通过 HBaseAdmin 对象操作。HBaseAdmin 负责META 表信息的处理。HBaseAdmin 提供了 createTable 方法。

public void createTable(HTableDescriptor desc)

HTableDescriptor 表示表的 Schema,提供常用方法有以下两个。

1)setMaxFileSize:指定最大 Region 的大小。
2)setMemStoreFlushSize:指定 MemStore Flush 到 HDFS 上的文件大小。

[info] 3、 增加 Family 列族

使用 addFamily 方法实现 Family 的添加

public void addFamily(final HColumnDescriptor family)

HColumnDescriptor 代表 Column 的 Schema,提供的常用方法有以下几个

1、setTimeToLive:指定最大的 TTL(单位是 ms),过期数据会被自动删除。
2、setInMemory:指定是否放在内存中,对小表有用,可用于提高效率。默认关闭。
3、setBloomFilter:指定是否使用 BloomFilter,可提高随机查询效率。默认关闭。
4、setCompressionType:设定数据压缩类型。默认无压缩。
5、setMaxVersions:指定数据最大保存的版本个数。默认为3。

举个简单的例子,创建 4 个 Family 表,命令如下

HBaseAdmin hAdmin = new HBaseAdmin(hbaseConfig);
HTableDescriptor table = new HTableDescriptor(tableName);
table.addFamily(new HColumnDescriptor("f1"));
table.addFamily(new HColumnDescriptor("f2"));
table.addFamily(new HColumnDescriptor("f3"));
table.addFamily(new HColumnDescriptor("f4"));
hAdmin.createTable(table);

[info] 4、 删除表

删除表也是通过 HBaseAdmin 来操作,删除表之前首先要 disable 表。这是一个非常耗时的操作,所以不建议频繁删除表。

disableTable 和 deleteTable 分别用来执行 disable 和 delete 操作。使用方法如下

HBaseAdmin hAdmin = new HBaseAdmin(hbaseConfig);
if(hAdmin.tableExists(tableName)){
    hAdmin.disableTable(tableName);
    hAdmin.deleteTable(tableName);
}

[info] 5、查询数据

查询分为单条随机查询和批量查询。单条查询通过 Row Key 在Table 中查询某一行的数据,HTable 提供了get 方法完成单条查询。批量查询通过制定一段 Row Key 的范围来查询,HTable 提供了 getScanner 方法完成批量查询。

public Result get(final Get get)
public ResultScanner getScanner(final Scan scan)

Get 对象包含一个 Get 查询需要的信息,它的构造方法有两种

public Get(byte [] row)
public Get(byte [] row,RowLock rowLock)

Row Lock 为了保证读写的原子性,可以传递一个已经存在 Row Lock,否则 HBase 会自动生成一个新的 Row Lock。

Scan 对象提供了默认构造函数,一般使用默认构造函数。

Get 和 Scan 的常用方法有以下几个

addFamily/addColumn:指定需要的 Family 或者 Column,如果没有调用任何 Family 或者 Column,会返回所有的 Column。
setMaxVersions:指定最大的版本个数。如果不带任何参数调用 setMaxVersions,表示取所有的版本。如果不调用 setMaxVersions,只会取到最新的版本。
setTimeRange:指定最大的时间戳和最小的时间戳,只有在此范围内的 Cell 才能被获取。
setTimeStamp:指定时间戳。
setFilter:指定 Filter 过滤不需要的信息。

Scan 特有的方法如下

setStartRow:指定开始的行。如果不调用,从表头开始。
setStopRow:指定结束的行(不含此行)。
setBatch:指定最多返回的 Cell 数目。防止一行中有过多的数据,导致 OOM 错误。

Result 代表是一行的数据。常用方法有以下几个

getRow:返回 Row Key。
raw:返回所有的 KeyValue 数组。
getValue:按照 Column 来获取 Cell 的值。
ResultScanner 是 Result 的一个容器,每次调用ResultScanner 的next 方法会返回Result。

示例代码如下:

Scan scan = new Scan();
scan.setMaxVersions();
ResultScanner ss = table.getScanner(scan);
for(Result r:ss){
    System.out.println(new String(r.getRow()));
    for(KeyValue kv:r.raw){
        System.out.println(new String(kv.getColumn()));
    }
}

[info] 6、 插入数据

HTable 通过 put 方法插入数据,可以传递单个 put 对象 或 List put 对象分别实现单条插入和批量插入。

public void put(final Put put) throws IOException
public void put(final List< Put> puts) throws IOException

Put 提供3 种构造方式

public Put(byte [] row)
public Put(byte [] row)
public Put(byte [] row,RowLock rowLock)
public Put(Put putToCopy)

Put 常用的方法有以下几个

1)add:增加一个 Cell。

2)setTimeStamp:指定所有 Cell 默认的 timestamp,如果一个 Cell 没有指定 timestamp,就会用到这个值。如果没有调用,HBase 会将当前时间作为未指定 timestamp 的Cell 的 timestamp。

3)setWriteToWAL:WAL 是 Write Ahead Log 的缩写,指的是 HBase 在插入操作前是否写 Log。默认是打开,关掉会提高性能,但是如果系统出现故障(负责插入的Region Server 挂掉),数据可能会丢失。

另外 HTable 也有两个方法会影响插入的性能。

1)setAutoFlash:AutoFlush 指的是在每次调用 HBase 的 Put 操作,是否提交到 HBase Server。默认是 true,每次会提交。如果此时是单条插入,就会有更多的I/O,从而降低其性能。

2)setWriteBufferSize:Write Buffer Size 在 AutoFlush 为false 的时候起作用,默认是 2MB,也就是插入数据超过 2MB,就会自动提交到 Server。

示例代码如下:

HTable table = new HTable(hbaseConfig, tableName);
table.setAutoFlush(autoFlush);
List< Put> lp = new ArrayList< Put>();
int count = 10000;
byte[] buffer = new byte[1024];
Random r = new Random();
for(int i = 1;i <= count;++i){
    Put p = new Put(String.format("row%09d",i).getBytes());
    r.nextBytes(buffer);
    p.add("f1".getBytes(), null, buffer);
    p.add("f2".getBytes(), null, buffer);
    p.add("f3".getBytes(), null, buffer);
    p.add("f4".getBytes(), null, buffer);
    p.setWriteToWAL(wal);
    lp.add(p);
    if(i%1000==0){
        table.put(lp);
        lp.clear();
    }
}

[info] 7、 删除数据

HTable 通过 delete 方法删除数据

public void delete(final Delete delete)

Delete 构造方法如下:

public Delete(byte [] row)
public Delete(byte [] row, long timestamp, RowLock rowLock)
public Delete(final Delete d)

Delete 常用方法有 deleteFamily/deleteColumn,用来指定要删除的 Family 或者 Column 的数据。 如果不调用任何这样的方法,将会删除整行。

注意: 如果某个 Cell 的 timestamp 高于当前时间,这个 Cell 将不会被删除,仍然可以查出来

示例代码如下:

HTable table = new HTable(hbaseConfig,"mytest");
Delete d = new Delete("row1".getBytes());
table.delete(d)

[warning] 开发使用jar包

:-:

[warning] 客户端远程连接服务器流程

应用程序首先连接到zk,然后zk告知region在哪个regionserver上,然后,应用程序再连接到hbase的regionserver上读写数据。

:-:

[warning] HBase保存数据格式汉字显示问题

HBase保存的都是字节码,是否显示具体的汉字只是前端显示问题。知道这个,你就好解决问题了:通过hbase API的decode转码、HUE、hive外部表等方式都可以显示中文汉字。

[warning] 实战代码

package bigData.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HbaseDemo {

    private static Configuration conf = HBaseConfiguration.create();
    private static Admin admin;


    static {
        //应用程序首先连接到zk,然后zk告知region在哪个regionserver上,然后,应用程序再连接到hbase的regionserver上读写数据
        // 设置Zookeeper,直接设置IP地址
        conf.set("hbase.zookeeper.quorum", "bigdata1:2181,bigdata2:2181,bigdata3:2181");
        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 创建表,可以同时创建多个列簇
     *
     * @param tableName
     * @param columnFamily
     */
    public void createTable(String tableName, String... columnFamily) {
        TableName tableNameObj = TableName.valueOf(tableName);
        try {
            if (this.admin.tableExists(tableNameObj)) {
                System.out.println("Table : " + tableName + " already exists !");
            } else {
                HTableDescriptor td = new HTableDescriptor(tableNameObj);
                int len = columnFamily.length;
                for (int i = 0; i < len; i++) {
                    HColumnDescriptor family = new HColumnDescriptor(columnFamily[i]);
                    td.addFamily(family);
                }
                admin.createTable(td);
                System.out.println(tableName + " 表创建成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(tableName + " 表创建失败!");
        }
    }

    @Test
    public void testCreateTable() {
        createTable("cross_history", "carinfo", "parkInfo", "deviceInfo");
    }

    /**
     * 删除表
     * 
     * @param tableName
     */
    public void delTable(String tableName) {
        TableName tableNameObj = TableName.valueOf(tableName);
        try {
            if (this.admin.tableExists(tableNameObj)) {
                admin.disableTable(tableNameObj);
                admin.deleteTable(tableNameObj);
                System.out.println(tableName + " 表删除成功!");
            } else {
                System.out.println(tableName + " 表不存在!");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(tableName + " 表删除失败!");
        }
    }

    @Test
    public void testDelTable() {
        delTable("cross_history");
    }

    /**
     * 插入记录
     * 
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param qualifier
     * @param value
     */
    public void insertRecord(String tableName, String rowKey, String columnFamily, String qualifier, String value) {
        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            Put put = new Put(rowKey.getBytes());
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(value));
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(value));
            table.put(put);
            table.close();
            connection.close();
            System.out.println(tableName + " 表插入数据成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(tableName + " 表插入数据失败!");
        }
    }

    @Test
    public void testInsertRecord() {
        insertRecord("cross_history", "001", "carinfo", "plateNo", "浙A12345");
        insertRecord("cross_history", "002", "carinfo", "plateNo", "浙A12345");
        insertRecord("cross_history", "003", "carinfo", "plateNo", "浙A12345");
        insertRecord("cross_history", "001", "parkInfo", "parkName", "中兴花园");
        insertRecord("cross_history", "002", "parkInfo", "parkName", "中兴花园");
        insertRecord("cross_history", "003", "parkInfo", "parkName", "中兴花园");
        insertRecord("cross_history", "001", "deviceInfo", "deviceInfo", "道闸");
        insertRecord("cross_history", "002", "deviceInfo", "deviceInfo", "道闸");
        insertRecord("cross_history", "003", "deviceInfo", "deviceInfo", "道闸");
    }

    /**
     * 根据rowKey删除一行数据
     * 
     * @param tableName
     * @param rowKey
     */
    public void deleteRecord(String tableName, String rowKey) {
        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            Delete del = new Delete(rowKey.getBytes());
            table.delete(del);
            System.out.println(tableName + " 表删除数据成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(tableName + " 表删除数据失败!");
        }
    }

    @Test
    public void testDeleteRecord() {
        deleteRecord("cross_history", "001");
    }

    /** 
     * 根据rowKey获取一行数据
     *
     * @param tableName
     * @param rowKey
     * @return
     */
    public Result getOneRecord(String tableName, String rowKey) {
        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            Get get = new Get(rowKey.getBytes());
            Result rs = table.get(get);
            System.out.println(tableName + " 表获取数据成功!");
            System.out.println("rowkey为:" + rowKey);
            List<Cell> cells = rs.listCells();
            if (cells != null) {
                for (Cell cell : cells) {
                    System.out.println(new String(cell.getFamily()) + " : " + new String(cell.getQualifier()) + " : " + new String(cell.getValue()));
                }
            }

            return rs;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }

    }

    @Test
    public void testGetOneRecord() {
        getOneRecord("cross_history", "001");
    }

    /**
     * 获取所有记录
     * 
     * @param tableName
     * @return
     */
    public List<Result> getAll(String tableName) {
        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            ResultScanner scanner = table.getScanner(scan);
            List<Result> list = new ArrayList<Result>();
            for (Result r : scanner) {
                list.add(r);
            }
            scanner.close();
            System.out.println(tableName + " 表获取所有记录成功!");
            return list;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Test
    public void testGetAll() {
        System.out.println(getAll("cross_history"));
    }


}

[danger] 常见错误

解决办法(1):修改window机器的主机名和ip地址映射,解决bigdata1无法访问,Windows的hosts位置在 C:\Windows\System32\drivers\etc

解决办法(2):把代码的bigdata1改成虚拟机对应ip地址

相关文章

微信公众号