RocksDB迭代器 #

一、迭代器基础 #

1.1 什么是迭代器 #

迭代器(Iterator)是RocksDB提供的遍历数据的接口,支持有序访问键值对。

text
迭代器特点:
├── 有序遍历 - 按键的字典序排列
├── 双向遍历 - 支持正向和反向
├── 范围查询 - 支持指定范围遍历
└── 快照隔离 - 基于创建时的数据状态

1.2 创建迭代器 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>

int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    
    rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
    if (!status.ok()) {
        std::cerr << "Open failed: " << status.ToString() << std::endl;
        return 1;
    }
    
    // 写入测试数据
    for (int i = 0; i < 10; i++) {
        db->Put(rocksdb::WriteOptions(), 
                "key_" + std::to_string(i), 
                "value_" + std::to_string(i));
    }
    
    // 创建迭代器
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    // 使用迭代器
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        std::cout << it->key().ToString() << " => " 
                  << it->value().ToString() << std::endl;
    }
    
    // 检查错误
    if (!it->status().ok()) {
        std::cerr << "Iterator error: " << it->status().ToString() << std::endl;
    }
    
    // 释放迭代器
    delete it;
    delete db;
    return 0;
}

1.3 迭代器基本操作 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>

void BasicIteratorOperations(rocksdb::DB* db) {
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    // 移动到第一个元素
    it->SeekToFirst();
    if (it->Valid()) {
        std::cout << "First: " << it->key().ToString() << std::endl;
    }
    
    // 移动到最后一个元素
    it->SeekToLast();
    if (it->Valid()) {
        std::cout << "Last: " << it->key().ToString() << std::endl;
    }
    
    // 移动到指定键
    it->Seek("key_5");
    if (it->Valid()) {
        std::cout << "Seek key_5: " << it->key().ToString() << std::endl;
    }
    
    // 移动到指定键的上一位置
    it->SeekForPrev("key_5");
    if (it->Valid()) {
        std::cout << "SeekForPrev key_5: " << it->key().ToString() << std::endl;
    }
    
    // 向前移动
    it->Next();
    
    // 向后移动
    it->Prev();
    
    // 检查有效性
    bool valid = it->Valid();
    
    // 获取键和值
    rocksdb::Slice key = it->key();
    rocksdb::Slice value = it->value();
    
    // 检查状态
    rocksdb::Status status = it->status();
    
    delete it;
}

二、遍历方式 #

2.1 正向遍历 #

cpp
#include <rocksdb/db.h>
#include <iostream>

void ForwardScan(rocksdb::DB* db) {
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    std::cout << "Forward scan:" << std::endl;
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        std::cout << "  " << it->key().ToString() 
                  << " => " << it->value().ToString() << std::endl;
    }
    
    if (!it->status().ok()) {
        std::cerr << "Error: " << it->status().ToString() << std::endl;
    }
    
    delete it;
}

2.2 反向遍历 #

cpp
#include <rocksdb/db.h>
#include <iostream>

void BackwardScan(rocksdb::DB* db) {
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    std::cout << "Backward scan:" << std::endl;
    for (it->SeekToLast(); it->Valid(); it->Prev()) {
        std::cout << "  " << it->key().ToString() 
                  << " => " << it->value().ToString() << std::endl;
    }
    
    if (!it->status().ok()) {
        std::cerr << "Error: " << it->status().ToString() << std::endl;
    }
    
    delete it;
}

2.3 从指定位置开始遍历 #

cpp
#include <rocksdb/db.h>
#include <iostream>

void ScanFromKey(rocksdb::DB* db, const std::string& start_key) {
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    std::cout << "Scan from " << start_key << ":" << std::endl;
    
    it->Seek(start_key);
    while (it->Valid()) {
        std::cout << "  " << it->key().ToString() 
                  << " => " << it->value().ToString() << std::endl;
        it->Next();
    }
    
    delete it;
}

三、范围查询 #

3.1 基本范围查询 #

cpp
#include <rocksdb/db.h>
#include <iostream>
#include <vector>

std::vector<std::pair<std::string, std::string>> 
RangeQuery(rocksdb::DB* db, 
           const std::string& start_key, 
           const std::string& end_key) {
    
    std::vector<std::pair<std::string, std::string>> result;
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    it->Seek(start_key);
    while (it->Valid() && it->key().ToString() < end_key) {
        result.push_back({
            it->key().ToString(),
            it->value().ToString()
        });
        it->Next();
    }
    
    delete it;
    return result;
}

int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 写入测试数据
    for (int i = 0; i < 100; i++) {
        char key[20];
        sprintf(key, "key_%03d", i);
        db->Put(rocksdb::WriteOptions(), key, std::to_string(i * 10));
    }
    
    // 范围查询
    auto result = RangeQuery(db, "key_020", "key_040");
    
    std::cout << "Range query result:" << std::endl;
    for (const auto& [key, value] : result) {
        std::cout << "  " << key << " => " << value << std::endl;
    }
    
    delete db;
    return 0;
}

3.2 使用ReadOptions设置范围 #

cpp
#include <rocksdb/db.h>
#include <iostream>

void RangeQueryWithOptions(rocksdb::DB* db,
                           const std::string& start_key,
                           const std::string& end_key) {
    
    rocksdb::ReadOptions read_options;
    read_options.iterate_upper_bound = &end_key;  // 设置上界
    
    rocksdb::Iterator* it = db->NewIterator(read_options);
    
    it->Seek(start_key);
    while (it->Valid()) {
        std::cout << it->key().ToString() 
                  << " => " << it->value().ToString() << std::endl;
        it->Next();
    }
    
    delete it;
}

3.3 带分页的范围查询 #

cpp
#include <rocksdb/db.h>
#include <string>
#include <vector>

struct PageResult {
    std::vector<std::pair<std::string, std::string>> items;
    std::string next_key;
    bool has_more;
};

PageResult PaginatedScan(rocksdb::DB* db,
                         const std::string& start_key,
                         const std::string& end_key,
                         int page_size) {
    
    PageResult result;
    result.has_more = false;
    
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    std::string current_key = start_key;
    int count = 0;
    
    it->Seek(current_key);
    while (it->Valid() && it->key().ToString() < end_key && count < page_size) {
        result.items.push_back({
            it->key().ToString(),
            it->value().ToString()
        });
        it->Next();
        count++;
    }
    
    if (it->Valid() && it->key().ToString() < end_key) {
        result.has_more = true;
        result.next_key = it->key().ToString();
    }
    
    delete it;
    return result;
}

// 使用示例
void DemoPagination(rocksdb::DB* db) {
    std::string start = "key_000";
    std::string end = "key_100";
    int page_size = 10;
    
    while (true) {
        PageResult page = PaginatedScan(db, start, end, page_size);
        
        std::cout << "Page:" << std::endl;
        for (const auto& [key, value] : page.items) {
            std::cout << "  " << key << " => " << value << std::endl;
        }
        
        if (!page.has_more) {
            break;
        }
        
        start = page.next_key;
        std::cout << "---" << std::endl;
    }
}

四、前缀扫描 #

4.1 基本前缀扫描 #

cpp
#include <rocksdb/db.h>
#include <iostream>
#include <vector>

std::vector<std::pair<std::string, std::string>>
PrefixScan(rocksdb::DB* db, const std::string& prefix) {
    
    std::vector<std::pair<std::string, std::string>> result;
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    it->Seek(prefix);
    while (it->Valid() && it->key().starts_with(prefix)) {
        result.push_back({
            it->key().ToString(),
            it->value().ToString()
        });
        it->Next();
    }
    
    delete it;
    return result;
}

int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 写入带前缀的数据
    db->Put(rocksdb::WriteOptions(), "user:1001:name", "Alice");
    db->Put(rocksdb::WriteOptions(), "user:1001:age", "25");
    db->Put(rocksdb::WriteOptions(), "user:1001:email", "alice@example.com");
    db->Put(rocksdb::WriteOptions(), "user:1002:name", "Bob");
    db->Put(rocksdb::WriteOptions(), "user:1002:age", "30");
    db->Put(rocksdb::WriteOptions(), "order:1001", "order_data");
    
    // 扫描user前缀
    auto user_data = PrefixScan(db, "user:");
    
    std::cout << "User data:" << std::endl;
    for (const auto& [key, value] : user_data) {
        std::cout << "  " << key << " => " << value << std::endl;
    }
    
    delete db;
    return 0;
}

4.2 使用Bloom Filter优化前缀扫描 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/table.h>
#include <rocksdb/filter_policy.h>

void SetupPrefixBloomFilter(rocksdb::Options& options, size_t prefix_size) {
    rocksdb::BlockBasedTableOptions table_options;
    
    // 创建前缀布隆过滤器
    table_options.filter_policy.reset(
        rocksdb::NewBloomFilterPolicy(10, false)
    );
    
    // 启用前缀提取
    table_options.prefix_extractor.reset(
        rocksdb::NewFixedPrefixTransform(prefix_size)
    );
    
    // 启用全过滤器
    table_options.whole_key_filtering = false;
    table_options.index_type = rocksdb::BlockBasedTableOptions::kHashSearch;
    
    options.table_factory.reset(
        rocksdb::NewBlockBasedTableFactory(table_options)
    );
}

// 使用示例
int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    
    // 设置前缀布隆过滤器(前缀长度为5)
    SetupPrefixBloomFilter(options, 5);
    
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 写入数据
    for (int i = 0; i < 10000; i++) {
        std::string key = "user:" + std::to_string(i) + ":data";
        db->Put(rocksdb::WriteOptions(), key, "value_" + std::to_string(i));
    }
    
    // 前缀扫描
    rocksdb::ReadOptions read_options;
    read_options.prefix_same_as_start = true;
    
    rocksdb::Iterator* it = db->NewIterator(read_options);
    it->Seek("user:100:");
    
    while (it->Valid() && it->key().starts_with("user:100:")) {
        std::cout << it->key().ToString() << std::endl;
        it->Next();
    }
    
    delete it;
    delete db;
    return 0;
}

五、迭代器选项 #

5.1 ReadOptions配置 #

cpp
rocksdb::ReadOptions read_options;

// 是否验证校验和
read_options.verify_checksums = true;

// 是否填充块缓存
read_options.fill_cache = true;

// 使用快照
read_options.snapshot = nullptr;

// 设置迭代上界
std::string upper_bound = "key_zzz";
read_options.iterate_upper_bound = &upper_bound;

// 设置迭代下界
std::string lower_bound = "key_aaa";
read_options.iterate_lower_bound = &lower_bound;

// 前缀扫描模式
read_options.prefix_same_as_start = true;

// 预读大小
read_options.readahead_size = 1024 * 1024;  // 1MB

// 异步IO
read_options.async_io = false;

// 创建迭代器
rocksdb::Iterator* it = db->NewIterator(read_options);

5.2 迭代边界优化 #

cpp
#include <rocksdb/db.h>
#include <iostream>

void OptimizedRangeScan(rocksdb::DB* db,
                        const std::string& start,
                        const std::string& end) {
    
    rocksdb::ReadOptions read_options;
    
    // 设置边界可以优化迭代器性能
    read_options.iterate_lower_bound = &start;
    read_options.iterate_upper_bound = &end;
    
    // 不填充缓存(适合大范围扫描)
    read_options.fill_cache = false;
    
    // 设置预读大小
    read_options.readahead_size = 2 * 1024 * 1024;  // 2MB
    
    rocksdb::Iterator* it = db->NewIterator(read_options);
    
    it->Seek(start);
    while (it->Valid()) {
        // 处理数据
        std::cout << it->key().ToString() << std::endl;
        it->Next();
    }
    
    delete it;
}

六、高级用法 #

6.1 统计键数量 #

cpp
#include <rocksdb/db.h>

uint64_t CountKeys(rocksdb::DB* db) {
    uint64_t count = 0;
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        count++;
    }
    
    delete it;
    return count;
}

uint64_t CountKeysInRange(rocksdb::DB* db,
                          const std::string& start,
                          const std::string& end) {
    uint64_t count = 0;
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    it->Seek(start);
    while (it->Valid() && it->key().ToString() < end) {
        count++;
        it->Next();
    }
    
    delete it;
    return count;
}

6.2 批量收集数据 #

cpp
#include <rocksdb/db.h>
#include <vector>
#include <functional>

void ScanAndProcess(rocksdb::DB* db,
                    const std::string& start,
                    const std::string& end,
                    std::function<void(const std::string&, const std::string&)> processor) {
    
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    it->Seek(start);
    while (it->Valid() && it->key().ToString() < end) {
        processor(it->key().ToString(), it->value().ToString());
        it->Next();
    }
    
    delete it;
}

// 使用示例
void ExportData(rocksdb::DB* db, const std::string& filename) {
    std::ofstream file(filename);
    
    ScanAndProcess(db, "", "~", [&file](const std::string& key, const std::string& value) {
        file << key << "\t" << value << "\n";
    });
    
    file.close();
}

6.3 查找特定模式 #

cpp
#include <rocksdb/db.h>
#include <regex>
#include <vector>

std::vector<std::pair<std::string, std::string>>
FindByPattern(rocksdb::DB* db, const std::string& pattern) {
    
    std::vector<std::pair<std::string, std::string>> result;
    std::regex regex_pattern(pattern);
    
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        std::string key = it->key().ToString();
        if (std::regex_match(key, regex_pattern)) {
            result.push_back({key, it->value().ToString()});
        }
    }
    
    delete it;
    return result;
}

// 使用示例
void DemoPatternSearch(rocksdb::DB* db) {
    // 查找所有以user:开头,后跟数字的键
    auto results = FindByPattern(db, "user:[0-9]+:.*");
    
    for (const auto& [key, value] : results) {
        std::cout << key << " => " << value << std::endl;
    }
}

七、错误处理 #

7.1 检查迭代器状态 #

cpp
#include <rocksdb/db.h>
#include <iostream>

void SafeIterate(rocksdb::DB* db) {
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        // 处理数据
        std::cout << it->key().ToString() << std::endl;
        
        // 检查迭代器状态
        if (!it->status().ok()) {
            std::cerr << "Iterator error: " << it->status().ToString() << std::endl;
            break;
        }
    }
    
    // 最终状态检查
    if (!it->status().ok()) {
        std::cerr << "Final iterator error: " << it->status().ToString() << std::endl;
    }
    
    delete it;
}

7.2 处理数据损坏 #

cpp
#include <rocksdb/db.h>
#include <iostream>

void HandleCorruption(rocksdb::DB* db) {
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        rocksdb::Status status = it->status();
        
        if (status.IsCorruption()) {
            std::cerr << "Corruption detected at key: " 
                      << it->key().ToString() << std::endl;
            // 可以选择跳过或停止
            continue;
        }
        
        if (!status.ok()) {
            std::cerr << "Error: " << status.ToString() << std::endl;
            break;
        }
        
        // 正常处理
        std::cout << it->key().ToString() << " => " 
                  << it->value().ToString() << std::endl;
    }
    
    delete it;
}

八、性能优化 #

8.1 迭代器性能建议 #

建议 说明
设置边界 使用iterate_upper_bound/lower_bound
控制缓存 大范围扫描时禁用fill_cache
预读优化 设置合适的readahead_size
前缀优化 使用prefix_same_as_start
及时释放 使用完毕立即delete迭代器

8.2 性能对比示例 #

cpp
#include <rocksdb/db.h>
#include <chrono>
#include <iostream>

void BenchmarkIteration(rocksdb::DB* db, int num_keys) {
    // 方法1:基本迭代
    auto start = std::chrono::high_resolution_clock::now();
    
    rocksdb::Iterator* it1 = db->NewIterator(rocksdb::ReadOptions());
    int count1 = 0;
    for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
        count1++;
    }
    delete it1;
    
    auto end = std::chrono::high_resolution_clock::now();
    std::cout << "Basic iteration: " 
              << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
              << " ms" << std::endl;
    
    // 方法2:优化迭代
    start = std::chrono::high_resolution_clock::now();
    
    rocksdb::ReadOptions optimized_options;
    optimized_options.fill_cache = false;
    optimized_options.readahead_size = 4 * 1024 * 1024;
    
    rocksdb::Iterator* it2 = db->NewIterator(optimized_options);
    int count2 = 0;
    for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
        count2++;
    }
    delete it2;
    
    end = std::chrono::high_resolution_clock::now();
    std::cout << "Optimized iteration: " 
              << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
              << " ms" << std::endl;
}

九、总结 #

9.1 迭代器操作速查 #

操作 方法 说明
移到开头 SeekToFirst() 定位到第一个键
移到末尾 SeekToLast() 定位到最后一个键
定位键 Seek(key) 定位到>=key的位置
定位前键 SeekForPrev(key) 定位到<=key的位置
下一个 Next() 移动到下一个键
上一个 Prev() 移动到上一个键
检查有效 Valid() 检查当前位置是否有效
获取键 key() 获取当前键
获取值 value() 获取当前值
检查状态 status() 获取迭代器状态

9.2 最佳实践 #

  1. 及时释放迭代器:使用完毕后立即delete
  2. 设置迭代边界:使用边界优化性能
  3. 检查迭代器状态:处理可能的错误
  4. 合理使用缓存:大范围扫描时禁用缓存
  5. 使用前缀优化:启用前缀布隆过滤器

下一步,让我们学习快照机制!

最后更新:2026-03-27