批量加载 #
一、批量加载概述 #
1.1 什么是批量加载 #
批量加载是将大量数据高效导入Neptune的方法。
text
批量加载特点:
├── 高性能导入
├── 支持多种数据格式
├── 支持大规模数据
├── 支持增量加载
└── 错误处理机制
1.2 支持的数据格式 #
text
数据格式:
├── Gremlin格式
│ ├── CSV
│ └── JSON
├── SPARQL格式
│ ├── N-Triples
│ ├── N-Quads
│ ├── RDF/XML
│ └── Turtle
└── OpenCypher格式
└── CSV
二、数据准备 #
2.1 Gremlin CSV格式 #
顶点文件(vertices.csv):
csv
~id,~label,name:String,age:Int,email:String
user_001,user,Tom,30,tom@example.com
user_002,user,Jerry,25,jerry@example.com
user_003,user,Mike,35,mike@example.com
边文件(edges.csv):
csv
~id,~label,~from,~to,since:Int,weight:Double
e1,knows,user_001,user_002,2020,0.8
e2,knows,user_001,user_003,2021,0.6
e3,follows,user_002,user_001,2022,0.9
2.2 SPARQL N-Quads格式 #
nquads
<http://example.org/user_001> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/User> .
<http://example.org/user_001> <http://xmlns.com/foaf/0.1/name> "Tom" .
<http://example.org/user_001> <http://example.org/age> "30"^^<http://www.w3.org/2001/XMLSchema#integer> .
<http://example.org/user_001> <http://example.org/knows> <http://example.org/user_002> .
2.3 数据上传到S3 #
bash
# 上传数据到S3
aws s3 cp vertices.csv s3://my-bucket/neptune-data/
aws s3 cp edges.csv s3://my-bucket/neptune-data/
# 设置权限
aws s3api put-object-acl --bucket my-bucket --key neptune-data/vertices.csv --acl bucket-owner-full-control
三、加载命令 #
3.1 使用curl加载 #
bash
curl -X POST \
-H 'Content-Type: application/json' \
https://your-cluster-endpoint:8182/loader \
-d '{
"source": "s3://my-bucket/neptune-data/",
"format": "csv",
"region": "us-east-1",
"failOnError": "FALSE",
"parallelism": "MEDIUM",
"updateSingleCardinalityProperties": "FALSE",
"queueRequest": "TRUE"
}'
3.2 使用AWS CLI加载 #
bash
aws neptune start-loader-job \
--source s3://my-bucket/neptune-data/ \
--format csv \
--region us-east-1 \
--db-cluster-identifier my-neptune-cluster \
--loader-job-name my-load-job
3.3 加载参数说明 #
json
{
"source": "s3://bucket/path/",
"format": "csv|nquads|rdfxml|turtle",
"region": "us-east-1",
"failOnError": "TRUE|FALSE",
"parallelism": "LOW|MEDIUM|HIGH|OVERSUBSCRIBE",
"updateSingleCardinalityProperties": "TRUE|FALSE",
"queueRequest": "TRUE|FALSE",
"dependencies": ["job-id-1", "job-id-2"]
}
四、监控加载任务 #
4.1 查看加载状态 #
bash
# 查看所有加载任务
curl -X GET https://your-cluster-endpoint:8182/loader
# 查看特定任务
curl -X GET https://your-cluster-endpoint:8182/loader/job-id
4.2 状态说明 #
text
加载状态:
├── LOAD_IN_PROGRESS:正在加载
├── LOAD_COMPLETED:加载完成
├── LOAD_CANCELLED:已取消
├── LOAD_FAILED:加载失败
├── LOAD_UNEXPECTED_ERROR:意外错误
└── LOAD_DATA_ERROR:数据错误
4.3 查看错误日志 #
bash
# 获取错误详情
curl -X GET https://your-cluster-endpoint:8182/loader/job-id?details=true
五、数据格式详解 #
5.1 Gremlin CSV格式 #
text
列说明:
├── ~id:顶点或边的ID(必需)
├── ~label:顶点或边的标签(必需)
├── ~from:边的起始顶点ID(边必需)
├── ~to:边的结束顶点ID(边必需)
└── 其他列:属性(格式:name:Type)
支持的类型:
text
数据类型:
├── String(默认)
├── Int
├── Long
├── Float
├── Double
├── Boolean
└── Date
多值属性:
csv
~id,~label,name:String,tags:String[]
user_001,user,Tom,"tag1;tag2;tag3"
5.2 SPARQL格式 #
N-Triples:
ntriples
<subject> <predicate> <object> .
N-Quads(带图):
nquads
<subject> <predicate> <object> <graph> .
Turtle:
turtle
@prefix ex: <http://example.org/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
ex:user_001 a ex:User ;
foaf:name "Tom" ;
ex:age 30 .
六、增量加载 #
6.1 增量更新 #
json
{
"source": "s3://my-bucket/incremental-data/",
"format": "csv",
"updateSingleCardinalityProperties": "TRUE"
}
6.2 处理重复数据 #
text
重复数据处理:
├── 忽略:跳过重复数据
├── 更新:更新现有数据
└── 报错:加载失败
七、性能优化 #
7.1 并行度设置 #
text
并行度选项:
├── LOW:少量并行,适合小数据集
├── MEDIUM:中等并行,适合中等数据集
├── HIGH:高并行,适合大数据集
└── OVERSUBSCRIBE:超高并行,适合超大集群
7.2 文件大小建议 #
text
文件大小建议:
├── 单文件:1MB-1GB
├── 避免过小文件
├── 避免过大文件
└── 合理分片
7.3 批量加载最佳实践 #
text
最佳实践:
├── 使用合适的数据格式
├── 合理设置并行度
├── 监控加载进度
├── 处理加载错误
└── 验证加载结果
八、错误处理 #
8.1 常见错误 #
text
常见错误:
├── 格式错误:数据格式不正确
├── 类型错误:数据类型不匹配
├── 引用错误:引用不存在的顶点
├── 权限错误:S3访问权限问题
└── 网络错误:连接问题
8.2 错误恢复 #
bash
# 取消加载任务
curl -X DELETE https://your-cluster-endpoint:8182/loader/job-id
# 重新加载
curl -X POST https://your-cluster-endpoint:8182/loader -d '...'
九、实际应用示例 #
9.1 加载社交网络数据 #
bash
# 准备数据文件
# users.csv
~id,~label,name:String,email:String
user_001,user,Tom,tom@example.com
user_002,user,Jerry,jerry@example.com
# follows.csv
~id,~label,~from,~to,since:Int
e1,follows,user_001,user_002,2020
# 加载数据
curl -X POST \
-H 'Content-Type: application/json' \
https://endpoint:8182/loader \
-d '{
"source": "s3://my-bucket/social-data/",
"format": "csv",
"region": "us-east-1"
}'
9.2 加载知识图谱数据 #
bash
# 准备RDF数据
# knowledge.ttl
@prefix ex: <http://example.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
ex:Database a ex:Concept ;
rdfs:label "Database" .
ex:GraphDatabase a ex:Concept ;
rdfs:label "Graph Database" ;
rdfs:subClassOf ex:Database .
# 加载数据
curl -X POST \
-H 'Content-Type: application/json' \
https://endpoint:8182/loader \
-d '{
"source": "s3://my-bucket/knowledge-data/",
"format": "turtle",
"region": "us-east-1"
}'
十、总结 #
批量加载要点:
| 项目 | 说明 |
|---|---|
| 数据格式 | CSV, N-Quads, Turtle等 |
| 数据源 | S3存储桶 |
| 加载方式 | Loader API |
| 监控 | 查看加载状态 |
| 错误处理 | 日志和重试 |
最佳实践:
- 使用合适的数据格式
- 合理设置并行度
- 监控加载进度
- 处理加载错误
- 验证加载结果
下一步,让我们学习流式处理!
最后更新:2026-03-27