HBase-02API简介

客户端API: 基础

包括hbase shell命令行 和Java API

命令行

  • hbase shell: 进入 hbase shell, 然后可以用以下的 hbase 命令行
  • help 'list' : 查看帮助
  • status : 查看状态
  • 表级命令:
    • list : list all table
    • create 'table1','col_family1','col_family2' : 创建表, 并包含两个列族
    • alter 'table1', NAME => 'col_family1', TTL => '604800' : 修改列族的TTL(秒)
    • describe 'table1' : 查看表描述
    • deleteall 'table1', 'rowkey1', 'column' : 删除该行键下”column”列的数据
    • deleteall 'table1', 'rowkey1' : 删除所有该行键下的数据
    • truncate 'table1' 清空表
    • disable 'table1' 然后 drop 'table1' : 删除表
    • count 'table1' : 统计表的行数
  • 基本查询 & 删除:
    • put 'table1', 'rowkey1', 'col_family1:col111', 'value' : 插入一条数据, 格式为put 表名, 行键, 列族:列, 值
    • get 'table1', 'rowkey1' : 查询记录, 格式为get 表名, 行键
    • get 'table1', 'rowkey1', 'col_family1:col111' : 查询记录, 格式为get 表名, 行键, 列族:列
    • get 'table1', 'rowkey1', { COLUMN => 'col_family1:col111', VERSIONS=>1}: 查询记录, 格式为get 表名, 行键, {条件}, 其中COLUMNVERSIONS是预定义的, 分别表示列和版本号
  • 扫描:
    • scan 'table1' , {'LIMIT' => 5} : 扫描一个表, {'LIMIT' => 5}是可选的
    • scan 'table1', {COLUMNS => 'fam:col', STARTROW => 'executed|1530226272', STOPROW => 'executed|1530233472'} : 带条件的扫描
    • scan 'hbase:meta' 扫描 hbase:meta 表, 旧的HBase版本里该表叫: .META.
    • scan 'hbase:meta', {COLUMNS => 'info:regioninfo'} 参考 # META表

Java API

增删:get & put

Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf,"test_table");

// Instantiating Put by rowkey
Put p = new Put(Bytes.toBytes("row1"));

// family name, qualifier ,value
p.add(Bytes.toBytes("personal"),
Bytes.toBytes("name"),Bytes.toBytes("raju"));

// Write the data
table.put(p);


// Instantiating Get by rowkey
Get g = new Get(Bytes.toBytes("row1"));

// Reading the data
Result result = table.get(g);

// Reading values from Result
byte [] value = result.getValue(Bytes.toBytes("personal"),Bytes.toBytes("name"));

System.out.println("personal.name=" + Bytes.toString(value))

table.close();

扫描:scan

Scan 属于不稳定接口,如扫描范围过大或设置不准会导致性能下降,使用时强烈建议设置 startKey 与 endKey,同时 start 与 end 之间不要超过100条数据。


Scan scan = new Scan();

scan.setLimit(10)

// Set start & stop Rowkey If Need
//scan.setStartRow(startRow);
//scan.setStopRow(stopRow);

// Scanning the required columns
scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"));
scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("city"));

// Getting the scan result
ResultScanner scanner = table.getScanner(scan);

// Reading values from scan result
for (Result result = scanner.next(); result != null; result = scanner.next())
System.out.println("Found row : " + result);

// Closing the scanner
scanner.close();

scan 的 setCaching 与 setBatch 方法的区别:

  • setCaching 设置的值为每次 rpc 的请求记录数,默认是1,设置更大的 cache 可以减少 RPC 次数来优化性能,但是一次传输的耗时也增加;
  • setBatch 设置每次取的 column size;有些 row 特别大,所以需要分开传给 client,就是一次传一个 row 的几个 column。
  • batch 和 caching 和 hbase table column size 共同决定了 rpc 的次数。

@ref: https://blog.csdn.net/caoli98033/article/details/44650497

scan 的实现: HBase 已经迭代了多个版本,不保证 scan 的实现在每个版本都一样

for 循环中每次遍历 ResultScanner 对象获取一行记录,实际上在客户端层面都会调用一次 next 请求。next 请求整个流程可以分为如下几个步骤:

  1. next请求首先会检查客户端缓存中是否存在还没有读取的数据行,如果有就直接返回,否则需要将next请求给HBase服务器端(RegionServer)。
  2. 如果客户端缓存已经没有扫描结果,就会将next请求发送给HBase服务器端。默认情况下,一次next请求仅可以请求100行数据(或者返回结果集总大小不超过2M)
  3. 服务器端接收到 next 请求之后就开始从 BlockCache、HFile 以及 MemStore 中一行一行进行扫描,扫描的行数达到100行之后就返回给客户端,客户端将这100条数据缓存到内存并返回一条给上层业务。

上层业务不断一条一条获取扫描数据,在数据量大的情况下实际上 HBase 客户端会不断发送 next 请求到 HBase 服务器。

@ref: HBase最佳实践 – Scan用法大观园 – 有态度的HBase/Spark/BigData

批处理:bacth

批量处理操作:可以批量处理跨多行的不同操作,
许多基于列表的操作,如delete、get的列表操作都是基于batch 方法实现的.

Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf,"test_table");

List<Row> batch = new ArrayList<>();

Put put = new Put(Bytes.toBytes("row_key1"));
put.addColumn(Bytes.toBytes("col_fam1"), Bytes.toBytes("col_qual3"), Bytes.toBytes("test_data3"));
batch.add(put);
// 添加更多...

Object[] results = new Object[batch.size()];
try{
table.batch(batch, results);
}catch(Exception e){
System.err.println(e);
}
for(int i=0;i<results.length;i++){
System.out.println("result"+"["+i+"]:"+results[i]);
}
table.close();

Table vs BufferedMutator

In the new API, BufferedMutator is used.
You could change Table t = connection.getTable(TableName.valueOf("foo")) to BufferedMutator t = connection.getBufferedMutator(TableName.valueOf("foo")). And then change t.put(p) to t.mutate(p)

行锁(RowLock)

行锁在客户端 API 中仍然存在,但是不鼓励使用,因为管理不好会锁定整个 RegionServer.

客户端API: 高级特性

过滤器(Filter)

Get 和 Scan 实例可以用 filters 配置,以应用于 RegionServer.
所有的过滤器都在服务端生效, 叫做”谓词下推”( predicate push down)。这样可以保证过滤掉的数据不会被传送到客户端.
过滤器在客户端被创建, 通过RPC传送到服务器端, 然后在服务器端执行.

Hbase客户端api提供了几种过滤器:

  • SingleColumnValueFilter : 列值过滤, 用于测试值的情况(相等,不等,范围)
  • RegexStringComparator: 支持正则表达式的值比较
  • SubstringComparator: 用于检测一个子串是否存在于值中。大小写不敏感。
  • FamilyFilter: 用于过滤列族。 通常,在Scan中选择ColumnFamilie优于在过滤器中做。
  • QualifierFilter: 用于基于列名(即 Qualifier)过滤.
  • ColumnPrefixFilter: 可基于列名(即Qualifier)前缀过滤。
  • RowFilter: 通常认为行选择时 Scan 采用 startRow/stopRow 方法比较好。然而 RowFilter 也可以用。
  • PageFilter:用于实现分页,它可以限制返回结果的最大行数。一旦达到该限制,过滤器就会停止扫描并返回结果。

下面是分页过滤器的一个例子:

Scan scan = new Scan(); 
scan.setStartRow(Bytes.toBytes(startRowKey));

Filter pageFilter = new PageFilter(10);
scan.setFilter(pageFilter);

ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
resultList.add(result);
}
resultScanner.close();

下面是列值过滤器的一个例子:

SingleColumnValueFilter filter = new SingleColumnValueFilter(
cf,
column,
CompareOp.EQUAL,
Bytes.toBytes("my value")
);
scan.setFilter(filter);

计数器(Increment)

一种支持的数据类型,值得一提的是“计数器”(如, 具有原子递增能力的数值)。参考 HTable的 Increment .
同步计数器在区域服务器中完成,不是客户端。

incr '<table>', '<row>', '<column>', |<increment-value>|

协处理器(Coprocessor)

HBase 的协处理器是从 0.92.0 开始引入的,参见 HBASE-2000。它的实现灵感来源于 Jeff Dean 在 LADIS 2009 分享主题《Designs, Lessons and Advice fromBuilding LargeDistributed Systems》中关于 Google 的 BigTable 协处理器的分享。当时的 BigTable 协处理器具有以下功能:

  • 每个表服务器的任意子表都可以运行代码;

  • 客户端的高层调用接口;

  • 跨多行的调用会自动拆分为多个并行化的 RPC 请求;

  • 通过协处理器可以非常灵活的构建分布式服务模型,能够自动化扩展、负载均衡、应用请求路由等。

HBase 当然也想要一个这么好的功能,因为通过这个功能我们可以实现二级索引(secondary indexing)、复杂过滤(complex filtering) 比如谓词下推(push down predicates)以及访问控制等功能。虽然 HBase 协处理器受 BigTable 协处理器的启发,但在实现细节方面存在差异。HBase 为我们建立了一个框架,并提供类库和运行时环境,使得我们可以在 HBase RegionServer 和 Master 上运行用户自定义代码;

协处理器框架已经为我们提供了一些实现类,我们可以通过继承这些类来扩展自己的功能。这些类主要分为两大类,即 Observer 和 Endpoint。

  • Observer 和 RDMBS 的触发器很类似,在一些特定的事件发生时被执行。这些事件包括用户产生的事件,也包括服务器内部产生的事件。目前 HBase 内置实现的 Observer 主要有以下几个:

    • WALObserver:提供控制 WAL 的钩子函数;
    • MasterObserver:可以被用作管理或 DDL 类型的操作,这些是集群级的事件;
    • RegionObserver:用户可以用这种处理器处理数据修改事件,它们与表的 Region 联系紧密;
    • BulkLoadObserver:进行 BulkLoad 的操作之前或之后会触发这个钩子函数;
    • RegionServerObserver :RegionServer 上发生的一些操作可以触发一些这个钩子函数,这个是 RegionServer 级别的事件;
    • EndpointObserver:每当用户调用 Endpoint 之前或之后会触发这个钩子,主要提供了一些回调方法。
  • Endpoint 和 RDMBS 的存储过程很类似,用户提供一些自定义代码,并在 HBase 服务器端执行,结果通过 RPC 返回给客户。比较常见的场景包括聚合操作(求和、计数等)。

使用协处理器

  • (1)编写协处理器代码,自定义的类继承一些框架类,例如 BaseRegionObserver
  • (2)将项目打成 jar 包,上传到 HDFS
  • (3)Hbase server 端加载 jar,
    • hbase shell 执行 alter 'table1',METHOD => 'table_att','Coprocessor'=>'hdfs://linux121:9000/processor/processor.jar|com.xx.hbase.processor.TestProcessor|1001|',其中1001是优先级,该数字越小,优先级越高。如果同一个钩子函数有多个协处理器实现,那么将按照优先级执行
    • 执行完可使用 describe 'table1' 命令查看,可以看到挂载的协处理器信息
  • (4) 使用协处理器:大部分协处理器可以自动触发执行,例如如果重写了 BaseRegionObserver.prePut,则在 put 之前触发

Example:加盐后数据如何 scan

  • 为了避免写入热点,rowkey 加盐后会变成如 salt- 开头,同时对 Region 进行预分区,打散后的 rowkey 写入对应的 Region;
  • 这时候如果要执行 scan,一种方法是在 client 端组装出多个 rowkey,Region 数量越多,需要组装的 Rowkey、并发发出的 RPC 请求也越多;
  • 另一种方式就是使用协处理器的方式:
    • 无需客户端组装加盐后的 Rowkey,客户端直接使用原始不带 salt 前缀的 Rowkey 进行 scan;
    • scan 时,通过 HTable 的 coprocessorService 触发协处理器的代码,通过 env.getRegion().getRegionInfo().getStartKey() 可以拿到当前 Region 的 StartKey,生成 salt- 前缀,再与客户端传输过来的 StartKey 参数拼接,生成真正的 Rowkey;
    • 客户端获取到每个 Region 返回的 Map results,再进行处理

@ref: HBase 协处理器入门及实战

批处理客户端

Hive

Hive+HBase: 使用Hive读取Hbase中的数据。
我们可以使用HQL语句在HBase表上进行查询、插入操作;甚至是进行Join和Union等复杂查询。此功能是从Hive 0.6.0开始引入的,详情可以参见HIVE-705。Hive与HBase整合的实现是利用两者本身对外的API接口互相进行通信,相互通信主要是依靠hive-hbase-handler-1.2.0.jar工具里面的类实现的。

参考: Hive和HBase整合用户指南

Apache Pig

如果不满足Hvie提供的HQL查询, 还可以用Pig Latin脚本实现更复杂的MapReduce Job,
Pig支持对Hbase表的读写, Hbase表中的”列”可以映射到Pig的元组

MapReduce

与Hadoop MapReduce Java API的整合

参考: 使用MapReduce APIs读写HBase