RocksDB迭代器 #
一、迭代器基础 #
1.1 什么是迭代器 #
迭代器(Iterator)是RocksDB提供的遍历数据的接口,支持有序访问键值对。
text
迭代器特点:
├── 有序遍历 - 按键的字典序排列
├── 双向遍历 - 支持正向和反向
├── 范围查询 - 支持指定范围遍历
└── 快照隔离 - 基于创建时的数据状态
1.2 创建迭代器 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
if (!status.ok()) {
std::cerr << "Open failed: " << status.ToString() << std::endl;
return 1;
}
// 写入测试数据
for (int i = 0; i < 10; i++) {
db->Put(rocksdb::WriteOptions(),
"key_" + std::to_string(i),
"value_" + std::to_string(i));
}
// 创建迭代器
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
// 使用迭代器
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::cout << it->key().ToString() << " => "
<< it->value().ToString() << std::endl;
}
// 检查错误
if (!it->status().ok()) {
std::cerr << "Iterator error: " << it->status().ToString() << std::endl;
}
// 释放迭代器
delete it;
delete db;
return 0;
}
1.3 迭代器基本操作 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
void BasicIteratorOperations(rocksdb::DB* db) {
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
// 移动到第一个元素
it->SeekToFirst();
if (it->Valid()) {
std::cout << "First: " << it->key().ToString() << std::endl;
}
// 移动到最后一个元素
it->SeekToLast();
if (it->Valid()) {
std::cout << "Last: " << it->key().ToString() << std::endl;
}
// 移动到指定键
it->Seek("key_5");
if (it->Valid()) {
std::cout << "Seek key_5: " << it->key().ToString() << std::endl;
}
// 移动到指定键的上一位置
it->SeekForPrev("key_5");
if (it->Valid()) {
std::cout << "SeekForPrev key_5: " << it->key().ToString() << std::endl;
}
// 向前移动
it->Next();
// 向后移动
it->Prev();
// 检查有效性
bool valid = it->Valid();
// 获取键和值
rocksdb::Slice key = it->key();
rocksdb::Slice value = it->value();
// 检查状态
rocksdb::Status status = it->status();
delete it;
}
二、遍历方式 #
2.1 正向遍历 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void ForwardScan(rocksdb::DB* db) {
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
std::cout << "Forward scan:" << std::endl;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::cout << " " << it->key().ToString()
<< " => " << it->value().ToString() << std::endl;
}
if (!it->status().ok()) {
std::cerr << "Error: " << it->status().ToString() << std::endl;
}
delete it;
}
2.2 反向遍历 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void BackwardScan(rocksdb::DB* db) {
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
std::cout << "Backward scan:" << std::endl;
for (it->SeekToLast(); it->Valid(); it->Prev()) {
std::cout << " " << it->key().ToString()
<< " => " << it->value().ToString() << std::endl;
}
if (!it->status().ok()) {
std::cerr << "Error: " << it->status().ToString() << std::endl;
}
delete it;
}
2.3 从指定位置开始遍历 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void ScanFromKey(rocksdb::DB* db, const std::string& start_key) {
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
std::cout << "Scan from " << start_key << ":" << std::endl;
it->Seek(start_key);
while (it->Valid()) {
std::cout << " " << it->key().ToString()
<< " => " << it->value().ToString() << std::endl;
it->Next();
}
delete it;
}
三、范围查询 #
3.1 基本范围查询 #
cpp
#include <rocksdb/db.h>
#include <iostream>
#include <vector>
std::vector<std::pair<std::string, std::string>>
RangeQuery(rocksdb::DB* db,
const std::string& start_key,
const std::string& end_key) {
std::vector<std::pair<std::string, std::string>> result;
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
it->Seek(start_key);
while (it->Valid() && it->key().ToString() < end_key) {
result.push_back({
it->key().ToString(),
it->value().ToString()
});
it->Next();
}
delete it;
return result;
}
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 写入测试数据
for (int i = 0; i < 100; i++) {
char key[20];
sprintf(key, "key_%03d", i);
db->Put(rocksdb::WriteOptions(), key, std::to_string(i * 10));
}
// 范围查询
auto result = RangeQuery(db, "key_020", "key_040");
std::cout << "Range query result:" << std::endl;
for (const auto& [key, value] : result) {
std::cout << " " << key << " => " << value << std::endl;
}
delete db;
return 0;
}
3.2 使用ReadOptions设置范围 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void RangeQueryWithOptions(rocksdb::DB* db,
const std::string& start_key,
const std::string& end_key) {
rocksdb::ReadOptions read_options;
read_options.iterate_upper_bound = &end_key; // 设置上界
rocksdb::Iterator* it = db->NewIterator(read_options);
it->Seek(start_key);
while (it->Valid()) {
std::cout << it->key().ToString()
<< " => " << it->value().ToString() << std::endl;
it->Next();
}
delete it;
}
3.3 带分页的范围查询 #
cpp
#include <rocksdb/db.h>
#include <string>
#include <vector>
struct PageResult {
std::vector<std::pair<std::string, std::string>> items;
std::string next_key;
bool has_more;
};
PageResult PaginatedScan(rocksdb::DB* db,
const std::string& start_key,
const std::string& end_key,
int page_size) {
PageResult result;
result.has_more = false;
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
std::string current_key = start_key;
int count = 0;
it->Seek(current_key);
while (it->Valid() && it->key().ToString() < end_key && count < page_size) {
result.items.push_back({
it->key().ToString(),
it->value().ToString()
});
it->Next();
count++;
}
if (it->Valid() && it->key().ToString() < end_key) {
result.has_more = true;
result.next_key = it->key().ToString();
}
delete it;
return result;
}
// 使用示例
void DemoPagination(rocksdb::DB* db) {
std::string start = "key_000";
std::string end = "key_100";
int page_size = 10;
while (true) {
PageResult page = PaginatedScan(db, start, end, page_size);
std::cout << "Page:" << std::endl;
for (const auto& [key, value] : page.items) {
std::cout << " " << key << " => " << value << std::endl;
}
if (!page.has_more) {
break;
}
start = page.next_key;
std::cout << "---" << std::endl;
}
}
四、前缀扫描 #
4.1 基本前缀扫描 #
cpp
#include <rocksdb/db.h>
#include <iostream>
#include <vector>
std::vector<std::pair<std::string, std::string>>
PrefixScan(rocksdb::DB* db, const std::string& prefix) {
std::vector<std::pair<std::string, std::string>> result;
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
it->Seek(prefix);
while (it->Valid() && it->key().starts_with(prefix)) {
result.push_back({
it->key().ToString(),
it->value().ToString()
});
it->Next();
}
delete it;
return result;
}
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 写入带前缀的数据
db->Put(rocksdb::WriteOptions(), "user:1001:name", "Alice");
db->Put(rocksdb::WriteOptions(), "user:1001:age", "25");
db->Put(rocksdb::WriteOptions(), "user:1001:email", "alice@example.com");
db->Put(rocksdb::WriteOptions(), "user:1002:name", "Bob");
db->Put(rocksdb::WriteOptions(), "user:1002:age", "30");
db->Put(rocksdb::WriteOptions(), "order:1001", "order_data");
// 扫描user前缀
auto user_data = PrefixScan(db, "user:");
std::cout << "User data:" << std::endl;
for (const auto& [key, value] : user_data) {
std::cout << " " << key << " => " << value << std::endl;
}
delete db;
return 0;
}
4.2 使用Bloom Filter优化前缀扫描 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/table.h>
#include <rocksdb/filter_policy.h>
void SetupPrefixBloomFilter(rocksdb::Options& options, size_t prefix_size) {
rocksdb::BlockBasedTableOptions table_options;
// 创建前缀布隆过滤器
table_options.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(10, false)
);
// 启用前缀提取
table_options.prefix_extractor.reset(
rocksdb::NewFixedPrefixTransform(prefix_size)
);
// 启用全过滤器
table_options.whole_key_filtering = false;
table_options.index_type = rocksdb::BlockBasedTableOptions::kHashSearch;
options.table_factory.reset(
rocksdb::NewBlockBasedTableFactory(table_options)
);
}
// 使用示例
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
// 设置前缀布隆过滤器(前缀长度为5)
SetupPrefixBloomFilter(options, 5);
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 写入数据
for (int i = 0; i < 10000; i++) {
std::string key = "user:" + std::to_string(i) + ":data";
db->Put(rocksdb::WriteOptions(), key, "value_" + std::to_string(i));
}
// 前缀扫描
rocksdb::ReadOptions read_options;
read_options.prefix_same_as_start = true;
rocksdb::Iterator* it = db->NewIterator(read_options);
it->Seek("user:100:");
while (it->Valid() && it->key().starts_with("user:100:")) {
std::cout << it->key().ToString() << std::endl;
it->Next();
}
delete it;
delete db;
return 0;
}
五、迭代器选项 #
5.1 ReadOptions配置 #
cpp
rocksdb::ReadOptions read_options;
// 是否验证校验和
read_options.verify_checksums = true;
// 是否填充块缓存
read_options.fill_cache = true;
// 使用快照
read_options.snapshot = nullptr;
// 设置迭代上界
std::string upper_bound = "key_zzz";
read_options.iterate_upper_bound = &upper_bound;
// 设置迭代下界
std::string lower_bound = "key_aaa";
read_options.iterate_lower_bound = &lower_bound;
// 前缀扫描模式
read_options.prefix_same_as_start = true;
// 预读大小
read_options.readahead_size = 1024 * 1024; // 1MB
// 异步IO
read_options.async_io = false;
// 创建迭代器
rocksdb::Iterator* it = db->NewIterator(read_options);
5.2 迭代边界优化 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void OptimizedRangeScan(rocksdb::DB* db,
const std::string& start,
const std::string& end) {
rocksdb::ReadOptions read_options;
// 设置边界可以优化迭代器性能
read_options.iterate_lower_bound = &start;
read_options.iterate_upper_bound = &end;
// 不填充缓存(适合大范围扫描)
read_options.fill_cache = false;
// 设置预读大小
read_options.readahead_size = 2 * 1024 * 1024; // 2MB
rocksdb::Iterator* it = db->NewIterator(read_options);
it->Seek(start);
while (it->Valid()) {
// 处理数据
std::cout << it->key().ToString() << std::endl;
it->Next();
}
delete it;
}
六、高级用法 #
6.1 统计键数量 #
cpp
#include <rocksdb/db.h>
uint64_t CountKeys(rocksdb::DB* db) {
uint64_t count = 0;
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
count++;
}
delete it;
return count;
}
uint64_t CountKeysInRange(rocksdb::DB* db,
const std::string& start,
const std::string& end) {
uint64_t count = 0;
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
it->Seek(start);
while (it->Valid() && it->key().ToString() < end) {
count++;
it->Next();
}
delete it;
return count;
}
6.2 批量收集数据 #
cpp
#include <rocksdb/db.h>
#include <vector>
#include <functional>
void ScanAndProcess(rocksdb::DB* db,
const std::string& start,
const std::string& end,
std::function<void(const std::string&, const std::string&)> processor) {
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
it->Seek(start);
while (it->Valid() && it->key().ToString() < end) {
processor(it->key().ToString(), it->value().ToString());
it->Next();
}
delete it;
}
// 使用示例
void ExportData(rocksdb::DB* db, const std::string& filename) {
std::ofstream file(filename);
ScanAndProcess(db, "", "~", [&file](const std::string& key, const std::string& value) {
file << key << "\t" << value << "\n";
});
file.close();
}
6.3 查找特定模式 #
cpp
#include <rocksdb/db.h>
#include <regex>
#include <vector>
std::vector<std::pair<std::string, std::string>>
FindByPattern(rocksdb::DB* db, const std::string& pattern) {
std::vector<std::pair<std::string, std::string>> result;
std::regex regex_pattern(pattern);
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::string key = it->key().ToString();
if (std::regex_match(key, regex_pattern)) {
result.push_back({key, it->value().ToString()});
}
}
delete it;
return result;
}
// 使用示例
void DemoPatternSearch(rocksdb::DB* db) {
// 查找所有以user:开头,后跟数字的键
auto results = FindByPattern(db, "user:[0-9]+:.*");
for (const auto& [key, value] : results) {
std::cout << key << " => " << value << std::endl;
}
}
七、错误处理 #
7.1 检查迭代器状态 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void SafeIterate(rocksdb::DB* db) {
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// 处理数据
std::cout << it->key().ToString() << std::endl;
// 检查迭代器状态
if (!it->status().ok()) {
std::cerr << "Iterator error: " << it->status().ToString() << std::endl;
break;
}
}
// 最终状态检查
if (!it->status().ok()) {
std::cerr << "Final iterator error: " << it->status().ToString() << std::endl;
}
delete it;
}
7.2 处理数据损坏 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void HandleCorruption(rocksdb::DB* db) {
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rocksdb::Status status = it->status();
if (status.IsCorruption()) {
std::cerr << "Corruption detected at key: "
<< it->key().ToString() << std::endl;
// 可以选择跳过或停止
continue;
}
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
break;
}
// 正常处理
std::cout << it->key().ToString() << " => "
<< it->value().ToString() << std::endl;
}
delete it;
}
八、性能优化 #
8.1 迭代器性能建议 #
| 建议 | 说明 |
|---|---|
| 设置边界 | 使用iterate_upper_bound/lower_bound |
| 控制缓存 | 大范围扫描时禁用fill_cache |
| 预读优化 | 设置合适的readahead_size |
| 前缀优化 | 使用prefix_same_as_start |
| 及时释放 | 使用完毕立即delete迭代器 |
8.2 性能对比示例 #
cpp
#include <rocksdb/db.h>
#include <chrono>
#include <iostream>
void BenchmarkIteration(rocksdb::DB* db, int num_keys) {
// 方法1:基本迭代
auto start = std::chrono::high_resolution_clock::now();
rocksdb::Iterator* it1 = db->NewIterator(rocksdb::ReadOptions());
int count1 = 0;
for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
count1++;
}
delete it1;
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Basic iteration: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;
// 方法2:优化迭代
start = std::chrono::high_resolution_clock::now();
rocksdb::ReadOptions optimized_options;
optimized_options.fill_cache = false;
optimized_options.readahead_size = 4 * 1024 * 1024;
rocksdb::Iterator* it2 = db->NewIterator(optimized_options);
int count2 = 0;
for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
count2++;
}
delete it2;
end = std::chrono::high_resolution_clock::now();
std::cout << "Optimized iteration: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;
}
九、总结 #
9.1 迭代器操作速查 #
| 操作 | 方法 | 说明 |
|---|---|---|
| 移到开头 | SeekToFirst() |
定位到第一个键 |
| 移到末尾 | SeekToLast() |
定位到最后一个键 |
| 定位键 | Seek(key) |
定位到>=key的位置 |
| 定位前键 | SeekForPrev(key) |
定位到<=key的位置 |
| 下一个 | Next() |
移动到下一个键 |
| 上一个 | Prev() |
移动到上一个键 |
| 检查有效 | Valid() |
检查当前位置是否有效 |
| 获取键 | key() |
获取当前键 |
| 获取值 | value() |
获取当前值 |
| 检查状态 | status() |
获取迭代器状态 |
9.2 最佳实践 #
- 及时释放迭代器:使用完毕后立即delete
- 设置迭代边界:使用边界优化性能
- 检查迭代器状态:处理可能的错误
- 合理使用缓存:大范围扫描时禁用缓存
- 使用前缀优化:启用前缀布隆过滤器
下一步,让我们学习快照机制!
最后更新:2026-03-27