时间旅行 #
一、时间旅行概述 #
1.1 什么是时间旅行 #
时间旅行允许查询历史时间点的数据状态。
text
时间旅行概念:
┌─────────────────────────────────────────┐
│ 时间线 │
├─────────────────────────────────────────┤
│ │
│ T1 T2 T3 T4 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 插入A 插入B 删除A 插入C │
│ │
│ 查询T2时刻 → 返回 {A, B} │
│ 查询T4时刻 → 返回 {B, C} │
│ │
└─────────────────────────────────────────┘
1.2 应用场景 #
text
应用场景:
┌─────────────────────────────────────────┐
│ 数据审计 │
│ - 查看历史数据状态 │
│ - 追踪数据变更 │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 数据恢复 │
│ - 恢复误删数据 │
│ - 回滚到历史版本 │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 时间范围查询 │
│ - 查看特定时间段数据 │
│ - 分析数据变化趋势 │
└─────────────────────────────────────────┘
二、时间戳概念 #
2.1 时间戳类型 #
text
时间戳类型:
┌─────────────────────────────────────────┐
│ 逻辑时间戳 │
├─────────────────────────────────────────┤
│ - Milvus内部使用 │
│ - 单调递增 │
│ - 用于数据版本控制 │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 物理时间戳 │
├─────────────────────────────────────────┤
│ - Unix时间戳 │
│ - 可读性强 │
│ - 用于业务逻辑 │
└─────────────────────────────────────────┘
2.2 获取时间戳 #
python
from pymilvus import Collection
import time
collection = Collection("documents")
result = collection.insert(data)
print(f"插入时间戳: {result.timestamp}")
current_timestamp = int(time.time())
print(f"当前时间戳: {current_timestamp}")
三、时间旅行查询 #
3.1 基本时间旅行 #
python
import time
result = collection.insert(data)
insert_timestamp = result.timestamp
time.sleep(1)
collection.delete('id in [1, 2, 3]')
results = collection.query(
expr='id > 0',
output_fields=["id", "title"],
travel_timestamp=insert_timestamp
)
print("删除前的数据:")
for r in results:
print(r)
3.2 搜索时时间旅行 #
python
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=10,
travel_timestamp=insert_timestamp
)
3.3 时间范围查询 #
python
start_timestamp = int(time.time()) - 86400
end_timestamp = int(time.time())
results = collection.query(
expr='id > 0',
output_fields=["id", "title", "created_at"],
travel_timestamp=end_timestamp
)
四、时间旅行应用 #
4.1 数据审计 #
python
class DataAuditor:
def __init__(self, collection):
self.collection = collection
def get_history_at_time(self, timestamp):
return self.collection.query(
expr='id > 0',
output_fields=["*"],
travel_timestamp=timestamp
)
def compare_snapshots(self, timestamp1, timestamp2):
data1 = self.get_history_at_time(timestamp1)
data2 = self.get_history_at_time(timestamp2)
ids1 = {d["id"] for d in data1}
ids2 = {d["id"] for d in data2}
added = ids2 - ids1
removed = ids1 - ids2
common = ids1 & ids2
return {
"added": list(added),
"removed": list(removed),
"unchanged": list(common)
}
def track_entity_history(self, entity_id, timestamps):
history = []
for ts in timestamps:
results = self.collection.query(
expr=f'id == {entity_id}',
output_fields=["*"],
travel_timestamp=ts
)
history.append({
"timestamp": ts,
"exists": len(results) > 0,
"data": results[0] if results else None
})
return history
auditor = DataAuditor(collection)
history = auditor.track_entity_history(
entity_id=1,
timestamps=[ts1, ts2, ts3]
)
4.2 数据恢复 #
python
class DataRecovery:
def __init__(self, collection):
self.collection = collection
def find_deleted_entities(self, before_timestamp):
current_data = self.collection.query(
expr='id > 0',
output_fields=["id"]
)
current_ids = {d["id"] for d in current_data}
historical_data = self.collection.query(
expr='id > 0',
output_fields=["id"],
travel_timestamp=before_timestamp
)
historical_ids = {d["id"] for d in historical_data}
deleted_ids = historical_ids - current_ids
return list(deleted_ids)
def recover_entity(self, entity_id, timestamp):
historical = self.collection.query(
expr=f'id == {entity_id}',
output_fields=["*"],
travel_timestamp=timestamp
)
if historical:
entity = historical[0]
self.collection.upsert([entity])
return True
return False
def recover_batch(self, entity_ids, timestamp):
recovered = []
for entity_id in entity_ids:
if self.recover_entity(entity_id, timestamp):
recovered.append(entity_id)
return recovered
recovery = DataRecovery(collection)
deleted = recovery.find_deleted_entities(before_timestamp)
recovery.recover_batch(deleted, before_timestamp)
4.3 版本对比 #
python
class VersionComparator:
def __init__(self, collection):
self.collection = collection
def get_version_data(self, timestamp):
return self.collection.query(
expr='id > 0',
output_fields=["*"],
travel_timestamp=timestamp
)
def compare_versions(self, ts1, ts2):
data1 = self.get_version_data(ts1)
data2 = self.get_version_data(ts2)
dict1 = {d["id"]: d for d in data1}
dict2 = {d["id"]: d for d in data2}
changes = {
"added": [],
"removed": [],
"modified": []
}
for id in set(dict1.keys()) | set(dict2.keys()):
if id not in dict1:
changes["added"].append(dict2[id])
elif id not in dict2:
changes["removed"].append(dict1[id])
elif dict1[id] != dict2[id]:
changes["modified"].append({
"id": id,
"before": dict1[id],
"after": dict2[id]
})
return changes
comparator = VersionComparator(collection)
changes = comparator.compare_versions(ts1, ts2)
五、时间旅行限制 #
5.1 时间范围限制 #
text
时间旅行限制:
┌─────────────────────────────────────────┐
│ 时间范围 │
├─────────────────────────────────────────┤
│ - 受数据保留期限制 │
│ - 默认保留期: 5天 │
│ - 可通过配置调整 │
└─────────────────────────────────────────┘
5.2 配置保留期 #
yaml
common:
retentionDuration: 432000
5.3 性能考虑 #
python
def efficient_time_travel_query(collection, timestamp, batch_size=1000):
all_results = []
offset = 0
while True:
results = collection.query(
expr='id > 0',
output_fields=["id"],
travel_timestamp=timestamp,
limit=batch_size,
offset=offset
)
if not results:
break
all_results.extend(results)
offset += batch_size
return all_results
六、最佳实践 #
6.1 时间戳管理 #
python
class TimestampManager:
def __init__(self):
self.timestamps = {}
def record_timestamp(self, event_name, timestamp=None):
if timestamp is None:
timestamp = int(time.time())
self.timestamps[event_name] = timestamp
return timestamp
def get_timestamp(self, event_name):
return self.timestamps.get(event_name)
def list_timestamps(self):
return self.timestamps
manager = TimestampManager()
ts_insert = manager.record_timestamp("data_insert")
ts_delete = manager.record_timestamp("data_delete")
6.2 定期快照 #
python
class SnapshotManager:
def __init__(self, collection):
self.collection = collection
self.snapshots = {}
def create_snapshot(self, name):
timestamp = int(time.time())
count = self.collection.num_entities
self.snapshots[name] = {
"timestamp": timestamp,
"entity_count": count,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
return timestamp
def restore_to_snapshot(self, name):
if name not in self.snapshots:
raise ValueError(f"快照 {name} 不存在")
timestamp = self.snapshots[name]["timestamp"]
return self.collection.query(
expr='id > 0',
output_fields=["*"],
travel_timestamp=timestamp
)
def list_snapshots(self):
return self.snapshots
snapshot_mgr = SnapshotManager(collection)
snapshot_mgr.create_snapshot("before_migration")
七、完整示例 #
7.1 数据版本控制系统 #
python
from pymilvus import Collection
import time
import json
class DataVersionControl:
def __init__(self, collection, version_file="versions.json"):
self.collection = collection
self.version_file = version_file
self.versions = self._load_versions()
def _load_versions(self):
try:
with open(self.version_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save_versions(self):
with open(self.version_file, 'w') as f:
json.dump(self.versions, f, indent=2)
def create_version(self, name, description=""):
timestamp = int(time.time())
count = self.collection.num_entities
self.versions[name] = {
"timestamp": timestamp,
"description": description,
"entity_count": count,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
self._save_versions()
return timestamp
def get_version_data(self, name):
if name not in self.versions:
raise ValueError(f"版本 {name} 不存在")
timestamp = self.versions[name]["timestamp"]
return self.collection.query(
expr='id > 0',
output_fields=["*"],
travel_timestamp=timestamp
)
def compare_versions(self, name1, name2):
ts1 = self.versions[name1]["timestamp"]
ts2 = self.versions[name2]["timestamp"]
data1 = self.get_version_data(name1)
data2 = self.get_version_data(name2)
ids1 = {d["id"] for d in data1}
ids2 = {d["id"] for d in data2}
return {
"added": list(ids2 - ids1),
"removed": list(ids1 - ids2),
"common": list(ids1 & ids2)
}
def list_versions(self):
return self.versions
dvc = DataVersionControl(collection)
dvc.create_version("v1.0", "初始版本")
dvc.create_version("v1.1", "添加新数据")
changes = dvc.compare_versions("v1.0", "v1.1")
八、总结 #
时间旅行操作速查表:
| 操作 | 方法 |
|---|---|
| 查询历史数据 | travel_timestamp参数 |
| 搜索历史数据 | search() + travel_timestamp |
| 数据审计 | 对比不同时间点数据 |
| 数据恢复 | 从历史时间点恢复 |
| 版本控制 | 记录时间戳快照 |
下一步,让我们学习集群架构!
最后更新:2026-04-04