HBase协处理器 #
一、协处理器概述 #
协处理器允许在RegionServer上执行用户代码,将计算推送到数据端,减少网络传输。
1.1 协处理器优势 #
text
协处理器优势
├── 计算下推
│ └── 减少网络传输
│
├── 服务端处理
│ └── 提高处理效率
│
├── 自动分布
│ └── 随Region自动分布
│
└── 扩展功能
└── 实现复杂业务逻辑
1.2 协处理器类型 #
text
协处理器类型
├── Observer(观察者)
│ ├── RegionObserver
│ ├── RegionServerObserver
│ ├── MasterObserver
│ └── WALObserver
│
└── Endpoint(端点)
└── 类似存储过程
二、Observer协处理器 #
2.1 Observer类型 #
| 类型 | 说明 |
|---|---|
| RegionObserver | Region级别事件观察 |
| RegionServerObserver | RegionServer级别事件观察 |
| MasterObserver | Master级别事件观察 |
| WALObserver | WAL级别事件观察 |
2.2 RegionObserver #
text
RegionObserver钩子
├── 数据操作
│ ├── prePut / postPut
│ ├── preGet / postGet
│ ├── preDelete / postDelete
│ └── preScan / postScan
│
├── Region生命周期
│ ├── preOpen / postOpen
│ ├── preClose / postClose
│ └── preSplit / postSplit
│
└── 其他
├── preBatchMutate / postBatchMutate
└── preIncrement / postIncrement
2.3 RegionObserver示例 #
java
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class MyRegionObserver implements RegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit, Durability durability)
throws IOException {
// 获取RowKey
byte[] rowKey = put.getRow();
// 获取列值
byte[] nameValue = put.get(Bytes.toBytes("info"), Bytes.toBytes("name")).get(0).getValue();
// 添加审计列
put.addColumn(
Bytes.toBytes("info"),
Bytes.toBytes("updated_at"),
Bytes.toBytes(System.currentTimeMillis())
);
System.out.println("PrePut: " + Bytes.toString(rowKey));
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit, Durability durability)
throws IOException {
System.out.println("PostPut executed");
}
}
2.4 MasterObserver示例 #
java
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
public class MyMasterObserver implements MasterObserver {
@Override
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableDescriptor desc, RegionInfo[] regions)
throws IOException {
System.out.println("PreCreateTable: " + desc.getTableName());
}
@Override
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
RegionInfo[] regions)
throws IOException {
System.out.println("PostCreateTable executed");
}
}
三、Endpoint协处理器 #
3.1 Endpoint概述 #
Endpoint类似于存储过程,允许在服务端执行聚合计算等操作。
text
Endpoint特点
├── 服务端计算
│ └── 减少数据传输
│
├── 分布式执行
│ └── 在所有Region上执行
│
├── 结果聚合
│ └── 客户端聚合结果
│
└── 类似存储过程
└── 执行复杂计算
3.2 Endpoint示例 #
java
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SumEndpoint extends BaseEndpointCoprocessor implements SumService {
@Override
public long sum(String tableName, String family, String qualifier)
throws IOException {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment();
Region region = env.getRegion();
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
InternalScanner scanner = region.getScanner(scan);
List<Cell> results = new ArrayList<>();
long sum = 0;
boolean hasMore = true;
while (hasMore) {
hasMore = scanner.next(results);
for (Cell cell : results) {
byte[] value = CellUtil.cloneValue(cell);
sum += Bytes.toLong(value);
}
results.clear();
}
scanner.close();
return sum;
}
}
3.3 调用Endpoint #
java
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class EndpointClient {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactory.createConnection();
Table table = connection.getTable(TableName.valueOf("user"));
// 调用Endpoint
SumService service = table.coprocessorService(
SumService.class,
Bytes.toBytes("user001"), // startRow
Bytes.toBytes("user999"), // endRow
new Batch.Call<SumService, Long>() {
@Override
public Long call(SumService service) throws IOException {
return service.sum("user", "info", "score");
}
}
);
// 聚合结果
long totalSum = 0;
for (Long value : service.values()) {
totalSum += value;
}
System.out.println("Total sum: " + totalSum);
table.close();
connection.close();
}
}
四、协处理器加载方式 #
4.1 静态加载 #
xml
<!-- hbase-site.xml -->
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.example.MyRegionObserver</value>
</property>
<property>
<name>hbase.coprocessor.master.classes</name>
<value>com.example.MyMasterObserver</value>
</property>
4.2 动态加载 #
ruby
# 加载协处理器到表
alter 'user', 'coprocessor' => 'hdfs:///path/to/coprocessor.jar|com.example.MyRegionObserver|1001|'
# 卸载协处理器
alter 'user', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
4.3 加载参数说明 #
text
参数格式
hdfs:///path/to/jar|ClassName|Priority|Configuration
参数说明
├── jar路径:HDFS上的jar包路径
├── 类名:协处理器全类名
├── 优先级:数字越小优先级越高
└── 配置:可选的配置参数
五、协处理器开发实践 #
5.1 开发步骤 #
text
开发步骤
├── 1. 创建Maven项目
├── 2. 添加HBase依赖
├── 3. 实现协处理器接口
├── 4. 打包上传到HDFS
└── 5. 加载协处理器
5.2 Maven依赖 #
xml
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.4.11</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.11</version>
<scope>provided</scope>
</dependency>
</dependencies>
5.3 打包配置 #
xml
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
5.4 上传和加载 #
bash
# 打包
mvn clean package
# 上传到HDFS
hdfs dfs -put target/coprocessor.jar /hbase/lib/
# 加载协处理器
hbase shell
alter 'user', 'coprocessor' => 'hdfs:///hbase/lib/coprocessor.jar|com.example.MyRegionObserver|1001|'
六、协处理器应用场景 #
6.1 数据审计 #
java
public class AuditObserver implements RegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit, Durability durability)
throws IOException {
// 添加审计信息
put.addColumn(
Bytes.toBytes("audit"),
Bytes.toBytes("created_by"),
Bytes.toBytes(getCurrentUser())
);
put.addColumn(
Bytes.toBytes("audit"),
Bytes.toBytes("created_at"),
Bytes.toBytes(System.currentTimeMillis())
);
}
}
6.2 数据校验 #
java
public class ValidationObserver implements RegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit, Durability durability)
throws IOException {
// 校验数据
byte[] ageValue = put.get(Bytes.toBytes("info"), Bytes.toBytes("age")).get(0).getValue();
int age = Bytes.toInt(ageValue);
if (age < 0 || age > 150) {
throw new IOException("Invalid age: " + age);
}
}
}
6.3 二级索引 #
java
public class IndexObserver implements RegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit, Durability durability)
throws IOException {
// 获取索引列值
byte[] emailValue = put.get(Bytes.toBytes("info"), Bytes.toBytes("email")).get(0).getValue();
// 写入索引表
Connection connection = c.getEnvironment().getConnection();
Table indexTable = connection.getTable(TableName.valueOf("user_email_index"));
Put indexPut = new Put(emailValue);
indexPut.addColumn(Bytes.toBytes("idx"), Bytes.toBytes("rowkey"), put.getRow());
indexTable.put(indexPut);
indexTable.close();
}
}
6.4 数据同步 #
java
public class SyncObserver implements RegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit, Durability durability)
throws IOException {
// 同步到其他系统
syncToElasticsearch(put);
}
private void syncToElasticsearch(Put put) {
// 同步逻辑
}
}
七、协处理器最佳实践 #
7.1 性能考虑 #
text
性能建议
├── 避免阻塞操作
│ └── 不要在协处理器中执行耗时操作
│
├── 控制资源使用
│ └── 避免占用过多内存
│
├── 异常处理
│ └── 合理处理异常
│
└── 日志记录
└── 记录关键操作日志
7.2 安全考虑 #
text
安全建议
├── 权限控制
│ └── 限制协处理器权限
│
├── 输入验证
│ └── 验证所有输入数据
│
├── 异常处理
│ └── 不要暴露敏感信息
│
└── 审计日志
└── 记录操作审计
7.3 调试技巧 #
text
调试建议
├── 日志输出
│ └── 使用LOG记录调试信息
│
├── 单元测试
│ └── 编写单元测试验证逻辑
│
├── 环境隔离
│ └── 先在测试环境验证
│
└── 版本控制
└── 管理协处理器版本
八、常见问题 #
8.1 类找不到 #
bash
# 问题:ClassNotFoundException
# 解决:确保jar包在HDFS上且路径正确
hdfs dfs -ls /hbase/lib/coprocessor.jar
8.2 协处理器不生效 #
ruby
# 检查协处理器是否加载
describe 'user'
# 查看输出中的 TABLE_ATTRIBUTES
# 应包含 coprocessor 信息
8.3 性能问题 #
text
问题:协处理器执行慢
解决:
├── 优化协处理器代码
├── 减少阻塞操作
├── 使用异步处理
└── 增加超时时间
九、总结 #
本节介绍了HBase协处理器:
| 类型 | 说明 |
|---|---|
| Observer | 观察者模式,拦截操作 |
| Endpoint | 类似存储过程,服务端计算 |
| RegionObserver | Region级别事件 |
| MasterObserver | Master级别事件 |
下一步,让我们学习Phoenix集成!
最后更新:2026-03-27