HBase协处理器 #

一、协处理器概述 #

协处理器允许在RegionServer上执行用户代码,将计算推送到数据端,减少网络传输。

1.1 协处理器优势 #

text
协处理器优势
├── 计算下推
│   └── 减少网络传输
│
├── 服务端处理
│   └── 提高处理效率
│
├── 自动分布
│   └── 随Region自动分布
│
└── 扩展功能
    └── 实现复杂业务逻辑

1.2 协处理器类型 #

text
协处理器类型
├── Observer(观察者)
│   ├── RegionObserver
│   ├── RegionServerObserver
│   ├── MasterObserver
│   └── WALObserver
│
└── Endpoint(端点)
    └── 类似存储过程

二、Observer协处理器 #

2.1 Observer类型 #

类型 说明
RegionObserver Region级别事件观察
RegionServerObserver RegionServer级别事件观察
MasterObserver Master级别事件观察
WALObserver WAL级别事件观察

2.2 RegionObserver #

text
RegionObserver钩子
├── 数据操作
│   ├── prePut / postPut
│   ├── preGet / postGet
│   ├── preDelete / postDelete
│   └── preScan / postScan
│
├── Region生命周期
│   ├── preOpen / postOpen
│   ├── preClose / postClose
│   └── preSplit / postSplit
│
└── 其他
    ├── preBatchMutate / postBatchMutate
    └── preIncrement / postIncrement

2.3 RegionObserver示例 #

java
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;

public class MyRegionObserver implements RegionObserver {
    
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
                       Put put, WALEdit edit, Durability durability) 
        throws IOException {
        
        // 获取RowKey
        byte[] rowKey = put.getRow();
        
        // 获取列值
        byte[] nameValue = put.get(Bytes.toBytes("info"), Bytes.toBytes("name")).get(0).getValue();
        
        // 添加审计列
        put.addColumn(
            Bytes.toBytes("info"),
            Bytes.toBytes("updated_at"),
            Bytes.toBytes(System.currentTimeMillis())
        );
        
        System.out.println("PrePut: " + Bytes.toString(rowKey));
    }
    
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
                        Put put, WALEdit edit, Durability durability) 
        throws IOException {
        System.out.println("PostPut executed");
    }
}

2.4 MasterObserver示例 #

java
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;

public class MyMasterObserver implements MasterObserver {
    
    @Override
    public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
                               TableDescriptor desc, RegionInfo[] regions) 
        throws IOException {
        System.out.println("PreCreateTable: " + desc.getTableName());
    }
    
    @Override
    public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
                                RegionInfo[] regions) 
        throws IOException {
        System.out.println("PostCreateTable executed");
    }
}

三、Endpoint协处理器 #

3.1 Endpoint概述 #

Endpoint类似于存储过程,允许在服务端执行聚合计算等操作。

text
Endpoint特点
├── 服务端计算
│   └── 减少数据传输
│
├── 分布式执行
│   └── 在所有Region上执行
│
├── 结果聚合
│   └── 客户端聚合结果
│
└── 类似存储过程
    └── 执行复杂计算

3.2 Endpoint示例 #

java
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SumEndpoint extends BaseEndpointCoprocessor implements SumService {
    
    @Override
    public long sum(String tableName, String family, String qualifier) 
        throws IOException {
        
        RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment();
        Region region = env.getRegion();
        
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        
        InternalScanner scanner = region.getScanner(scan);
        List<Cell> results = new ArrayList<>();
        long sum = 0;
        
        boolean hasMore = true;
        while (hasMore) {
            hasMore = scanner.next(results);
            for (Cell cell : results) {
                byte[] value = CellUtil.cloneValue(cell);
                sum += Bytes.toLong(value);
            }
            results.clear();
        }
        
        scanner.close();
        return sum;
    }
}

3.3 调用Endpoint #

java
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class EndpointClient {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactory.createConnection();
        Table table = connection.getTable(TableName.valueOf("user"));
        
        // 调用Endpoint
        SumService service = table.coprocessorService(
            SumService.class,
            Bytes.toBytes("user001"),  // startRow
            Bytes.toBytes("user999"),  // endRow
            new Batch.Call<SumService, Long>() {
                @Override
                public Long call(SumService service) throws IOException {
                    return service.sum("user", "info", "score");
                }
            }
        );
        
        // 聚合结果
        long totalSum = 0;
        for (Long value : service.values()) {
            totalSum += value;
        }
        
        System.out.println("Total sum: " + totalSum);
        
        table.close();
        connection.close();
    }
}

四、协处理器加载方式 #

4.1 静态加载 #

xml
<!-- hbase-site.xml -->
<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>com.example.MyRegionObserver</value>
</property>

<property>
    <name>hbase.coprocessor.master.classes</name>
    <value>com.example.MyMasterObserver</value>
</property>

4.2 动态加载 #

ruby
# 加载协处理器到表
alter 'user', 'coprocessor' => 'hdfs:///path/to/coprocessor.jar|com.example.MyRegionObserver|1001|'

# 卸载协处理器
alter 'user', METHOD => 'table_att_unset', NAME => 'coprocessor$1'

4.3 加载参数说明 #

text
参数格式
hdfs:///path/to/jar|ClassName|Priority|Configuration

参数说明
├── jar路径:HDFS上的jar包路径
├── 类名:协处理器全类名
├── 优先级:数字越小优先级越高
└── 配置:可选的配置参数

五、协处理器开发实践 #

5.1 开发步骤 #

text
开发步骤
├── 1. 创建Maven项目
├── 2. 添加HBase依赖
├── 3. 实现协处理器接口
├── 4. 打包上传到HDFS
└── 5. 加载协处理器

5.2 Maven依赖 #

xml
<dependencies>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>2.4.11</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.4.11</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

5.3 打包配置 #

xml
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

5.4 上传和加载 #

bash
# 打包
mvn clean package

# 上传到HDFS
hdfs dfs -put target/coprocessor.jar /hbase/lib/

# 加载协处理器
hbase shell
alter 'user', 'coprocessor' => 'hdfs:///hbase/lib/coprocessor.jar|com.example.MyRegionObserver|1001|'

六、协处理器应用场景 #

6.1 数据审计 #

java
public class AuditObserver implements RegionObserver {
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
                       Put put, WALEdit edit, Durability durability) 
        throws IOException {
        
        // 添加审计信息
        put.addColumn(
            Bytes.toBytes("audit"),
            Bytes.toBytes("created_by"),
            Bytes.toBytes(getCurrentUser())
        );
        put.addColumn(
            Bytes.toBytes("audit"),
            Bytes.toBytes("created_at"),
            Bytes.toBytes(System.currentTimeMillis())
        );
    }
}

6.2 数据校验 #

java
public class ValidationObserver implements RegionObserver {
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
                       Put put, WALEdit edit, Durability durability) 
        throws IOException {
        
        // 校验数据
        byte[] ageValue = put.get(Bytes.toBytes("info"), Bytes.toBytes("age")).get(0).getValue();
        int age = Bytes.toInt(ageValue);
        
        if (age < 0 || age > 150) {
            throw new IOException("Invalid age: " + age);
        }
    }
}

6.3 二级索引 #

java
public class IndexObserver implements RegionObserver {
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
                        Put put, WALEdit edit, Durability durability) 
        throws IOException {
        
        // 获取索引列值
        byte[] emailValue = put.get(Bytes.toBytes("info"), Bytes.toBytes("email")).get(0).getValue();
        
        // 写入索引表
        Connection connection = c.getEnvironment().getConnection();
        Table indexTable = connection.getTable(TableName.valueOf("user_email_index"));
        
        Put indexPut = new Put(emailValue);
        indexPut.addColumn(Bytes.toBytes("idx"), Bytes.toBytes("rowkey"), put.getRow());
        indexTable.put(indexPut);
        
        indexTable.close();
    }
}

6.4 数据同步 #

java
public class SyncObserver implements RegionObserver {
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
                        Put put, WALEdit edit, Durability durability) 
        throws IOException {
        
        // 同步到其他系统
        syncToElasticsearch(put);
    }
    
    private void syncToElasticsearch(Put put) {
        // 同步逻辑
    }
}

七、协处理器最佳实践 #

7.1 性能考虑 #

text
性能建议
├── 避免阻塞操作
│   └── 不要在协处理器中执行耗时操作
│
├── 控制资源使用
│   └── 避免占用过多内存
│
├── 异常处理
│   └── 合理处理异常
│
└── 日志记录
    └── 记录关键操作日志

7.2 安全考虑 #

text
安全建议
├── 权限控制
│   └── 限制协处理器权限
│
├── 输入验证
│   └── 验证所有输入数据
│
├── 异常处理
│   └── 不要暴露敏感信息
│
└── 审计日志
    └── 记录操作审计

7.3 调试技巧 #

text
调试建议
├── 日志输出
│   └── 使用LOG记录调试信息
│
├── 单元测试
│   └── 编写单元测试验证逻辑
│
├── 环境隔离
│   └── 先在测试环境验证
│
└── 版本控制
    └── 管理协处理器版本

八、常见问题 #

8.1 类找不到 #

bash
# 问题:ClassNotFoundException
# 解决:确保jar包在HDFS上且路径正确

hdfs dfs -ls /hbase/lib/coprocessor.jar

8.2 协处理器不生效 #

ruby
# 检查协处理器是否加载
describe 'user'

# 查看输出中的 TABLE_ATTRIBUTES
# 应包含 coprocessor 信息

8.3 性能问题 #

text
问题:协处理器执行慢
解决:
├── 优化协处理器代码
├── 减少阻塞操作
├── 使用异步处理
└── 增加超时时间

九、总结 #

本节介绍了HBase协处理器:

类型 说明
Observer 观察者模式,拦截操作
Endpoint 类似存储过程,服务端计算
RegionObserver Region级别事件
MasterObserver Master级别事件

下一步,让我们学习Phoenix集成!

最后更新:2026-03-27