时间旅行 #

一、时间旅行概述 #

1.1 什么是时间旅行 #

时间旅行允许查询历史时间点的数据状态。

text
时间旅行概念:

┌─────────────────────────────────────────┐
│           时间线                         │
├─────────────────────────────────────────┤
│                                         │
│  T1        T2        T3        T4       │
│  │         │         │         │        │
│  ▼         ▼         ▼         ▼        │
│  插入A     插入B     删除A     插入C     │
│                                         │
│  查询T2时刻 → 返回 {A, B}               │
│  查询T4时刻 → 返回 {B, C}               │
│                                         │
└─────────────────────────────────────────┘

1.2 应用场景 #

text
应用场景:

┌─────────────────────────────────────────┐
│  数据审计                               │
│  - 查看历史数据状态                     │
│  - 追踪数据变更                         │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│  数据恢复                               │
│  - 恢复误删数据                         │
│  - 回滚到历史版本                       │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│  时间范围查询                           │
│  - 查看特定时间段数据                   │
│  - 分析数据变化趋势                     │
└─────────────────────────────────────────┘

二、时间戳概念 #

2.1 时间戳类型 #

text
时间戳类型:

┌─────────────────────────────────────────┐
│           逻辑时间戳                     │
├─────────────────────────────────────────┤
│  - Milvus内部使用                       │
│  - 单调递增                             │
│  - 用于数据版本控制                     │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│           物理时间戳                     │
├─────────────────────────────────────────┤
│  - Unix时间戳                           │
│  - 可读性强                             │
│  - 用于业务逻辑                         │
└─────────────────────────────────────────┘

2.2 获取时间戳 #

python
from pymilvus import Collection
import time

collection = Collection("documents")

result = collection.insert(data)
print(f"插入时间戳: {result.timestamp}")

current_timestamp = int(time.time())
print(f"当前时间戳: {current_timestamp}")

三、时间旅行查询 #

3.1 基本时间旅行 #

python
import time

result = collection.insert(data)
insert_timestamp = result.timestamp

time.sleep(1)

collection.delete('id in [1, 2, 3]')

results = collection.query(
    expr='id > 0',
    output_fields=["id", "title"],
    travel_timestamp=insert_timestamp
)

print("删除前的数据:")
for r in results:
    print(r)

3.2 搜索时时间旅行 #

python
search_params = {
    "metric_type": "L2",
    "params": {"nprobe": 10}
}

results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=10,
    travel_timestamp=insert_timestamp
)

3.3 时间范围查询 #

python
start_timestamp = int(time.time()) - 86400
end_timestamp = int(time.time())

results = collection.query(
    expr='id > 0',
    output_fields=["id", "title", "created_at"],
    travel_timestamp=end_timestamp
)

四、时间旅行应用 #

4.1 数据审计 #

python
class DataAuditor:
    def __init__(self, collection):
        self.collection = collection
    
    def get_history_at_time(self, timestamp):
        return self.collection.query(
            expr='id > 0',
            output_fields=["*"],
            travel_timestamp=timestamp
        )
    
    def compare_snapshots(self, timestamp1, timestamp2):
        data1 = self.get_history_at_time(timestamp1)
        data2 = self.get_history_at_time(timestamp2)
        
        ids1 = {d["id"] for d in data1}
        ids2 = {d["id"] for d in data2}
        
        added = ids2 - ids1
        removed = ids1 - ids2
        common = ids1 & ids2
        
        return {
            "added": list(added),
            "removed": list(removed),
            "unchanged": list(common)
        }
    
    def track_entity_history(self, entity_id, timestamps):
        history = []
        for ts in timestamps:
            results = self.collection.query(
                expr=f'id == {entity_id}',
                output_fields=["*"],
                travel_timestamp=ts
            )
            history.append({
                "timestamp": ts,
                "exists": len(results) > 0,
                "data": results[0] if results else None
            })
        return history

auditor = DataAuditor(collection)

history = auditor.track_entity_history(
    entity_id=1,
    timestamps=[ts1, ts2, ts3]
)

4.2 数据恢复 #

python
class DataRecovery:
    def __init__(self, collection):
        self.collection = collection
    
    def find_deleted_entities(self, before_timestamp):
        current_data = self.collection.query(
            expr='id > 0',
            output_fields=["id"]
        )
        current_ids = {d["id"] for d in current_data}
        
        historical_data = self.collection.query(
            expr='id > 0',
            output_fields=["id"],
            travel_timestamp=before_timestamp
        )
        historical_ids = {d["id"] for d in historical_data}
        
        deleted_ids = historical_ids - current_ids
        return list(deleted_ids)
    
    def recover_entity(self, entity_id, timestamp):
        historical = self.collection.query(
            expr=f'id == {entity_id}',
            output_fields=["*"],
            travel_timestamp=timestamp
        )
        
        if historical:
            entity = historical[0]
            self.collection.upsert([entity])
            return True
        return False
    
    def recover_batch(self, entity_ids, timestamp):
        recovered = []
        for entity_id in entity_ids:
            if self.recover_entity(entity_id, timestamp):
                recovered.append(entity_id)
        return recovered

recovery = DataRecovery(collection)

deleted = recovery.find_deleted_entities(before_timestamp)
recovery.recover_batch(deleted, before_timestamp)

4.3 版本对比 #

python
class VersionComparator:
    def __init__(self, collection):
        self.collection = collection
    
    def get_version_data(self, timestamp):
        return self.collection.query(
            expr='id > 0',
            output_fields=["*"],
            travel_timestamp=timestamp
        )
    
    def compare_versions(self, ts1, ts2):
        data1 = self.get_version_data(ts1)
        data2 = self.get_version_data(ts2)
        
        dict1 = {d["id"]: d for d in data1}
        dict2 = {d["id"]: d for d in data2}
        
        changes = {
            "added": [],
            "removed": [],
            "modified": []
        }
        
        for id in set(dict1.keys()) | set(dict2.keys()):
            if id not in dict1:
                changes["added"].append(dict2[id])
            elif id not in dict2:
                changes["removed"].append(dict1[id])
            elif dict1[id] != dict2[id]:
                changes["modified"].append({
                    "id": id,
                    "before": dict1[id],
                    "after": dict2[id]
                })
        
        return changes

comparator = VersionComparator(collection)
changes = comparator.compare_versions(ts1, ts2)

五、时间旅行限制 #

5.1 时间范围限制 #

text
时间旅行限制:

┌─────────────────────────────────────────┐
│           时间范围                       │
├─────────────────────────────────────────┤
│  - 受数据保留期限制                     │
│  - 默认保留期: 5天                      │
│  - 可通过配置调整                       │
└─────────────────────────────────────────┘

5.2 配置保留期 #

yaml
common:
  retentionDuration: 432000

5.3 性能考虑 #

python
def efficient_time_travel_query(collection, timestamp, batch_size=1000):
    all_results = []
    offset = 0
    
    while True:
        results = collection.query(
            expr='id > 0',
            output_fields=["id"],
            travel_timestamp=timestamp,
            limit=batch_size,
            offset=offset
        )
        
        if not results:
            break
        
        all_results.extend(results)
        offset += batch_size
    
    return all_results

六、最佳实践 #

6.1 时间戳管理 #

python
class TimestampManager:
    def __init__(self):
        self.timestamps = {}
    
    def record_timestamp(self, event_name, timestamp=None):
        if timestamp is None:
            timestamp = int(time.time())
        self.timestamps[event_name] = timestamp
        return timestamp
    
    def get_timestamp(self, event_name):
        return self.timestamps.get(event_name)
    
    def list_timestamps(self):
        return self.timestamps

manager = TimestampManager()

ts_insert = manager.record_timestamp("data_insert")
ts_delete = manager.record_timestamp("data_delete")

6.2 定期快照 #

python
class SnapshotManager:
    def __init__(self, collection):
        self.collection = collection
        self.snapshots = {}
    
    def create_snapshot(self, name):
        timestamp = int(time.time())
        count = self.collection.num_entities
        
        self.snapshots[name] = {
            "timestamp": timestamp,
            "entity_count": count,
            "created_at": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        return timestamp
    
    def restore_to_snapshot(self, name):
        if name not in self.snapshots:
            raise ValueError(f"快照 {name} 不存在")
        
        timestamp = self.snapshots[name]["timestamp"]
        return self.collection.query(
            expr='id > 0',
            output_fields=["*"],
            travel_timestamp=timestamp
        )
    
    def list_snapshots(self):
        return self.snapshots

snapshot_mgr = SnapshotManager(collection)
snapshot_mgr.create_snapshot("before_migration")

七、完整示例 #

7.1 数据版本控制系统 #

python
from pymilvus import Collection
import time
import json

class DataVersionControl:
    def __init__(self, collection, version_file="versions.json"):
        self.collection = collection
        self.version_file = version_file
        self.versions = self._load_versions()
    
    def _load_versions(self):
        try:
            with open(self.version_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {}
    
    def _save_versions(self):
        with open(self.version_file, 'w') as f:
            json.dump(self.versions, f, indent=2)
    
    def create_version(self, name, description=""):
        timestamp = int(time.time())
        count = self.collection.num_entities
        
        self.versions[name] = {
            "timestamp": timestamp,
            "description": description,
            "entity_count": count,
            "created_at": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        self._save_versions()
        return timestamp
    
    def get_version_data(self, name):
        if name not in self.versions:
            raise ValueError(f"版本 {name} 不存在")
        
        timestamp = self.versions[name]["timestamp"]
        return self.collection.query(
            expr='id > 0',
            output_fields=["*"],
            travel_timestamp=timestamp
        )
    
    def compare_versions(self, name1, name2):
        ts1 = self.versions[name1]["timestamp"]
        ts2 = self.versions[name2]["timestamp"]
        
        data1 = self.get_version_data(name1)
        data2 = self.get_version_data(name2)
        
        ids1 = {d["id"] for d in data1}
        ids2 = {d["id"] for d in data2}
        
        return {
            "added": list(ids2 - ids1),
            "removed": list(ids1 - ids2),
            "common": list(ids1 & ids2)
        }
    
    def list_versions(self):
        return self.versions

dvc = DataVersionControl(collection)

dvc.create_version("v1.0", "初始版本")

dvc.create_version("v1.1", "添加新数据")

changes = dvc.compare_versions("v1.0", "v1.1")

八、总结 #

时间旅行操作速查表:

操作 方法
查询历史数据 travel_timestamp参数
搜索历史数据 search() + travel_timestamp
数据审计 对比不同时间点数据
数据恢复 从历史时间点恢复
版本控制 记录时间戳快照

下一步,让我们学习集群架构!

最后更新:2026-04-04