批量加载 #

一、批量加载概述 #

1.1 什么是批量加载 #

批量加载是将大量数据高效导入Neptune的方法。

text
批量加载特点:
├── 高性能导入
├── 支持多种数据格式
├── 支持大规模数据
├── 支持增量加载
└── 错误处理机制

1.2 支持的数据格式 #

text
数据格式:
├── Gremlin格式
│   ├── CSV
│   └── JSON
├── SPARQL格式
│   ├── N-Triples
│   ├── N-Quads
│   ├── RDF/XML
│   └── Turtle
└── OpenCypher格式
    └── CSV

二、数据准备 #

2.1 Gremlin CSV格式 #

顶点文件(vertices.csv):

csv
~id,~label,name:String,age:Int,email:String
user_001,user,Tom,30,tom@example.com
user_002,user,Jerry,25,jerry@example.com
user_003,user,Mike,35,mike@example.com

边文件(edges.csv):

csv
~id,~label,~from,~to,since:Int,weight:Double
e1,knows,user_001,user_002,2020,0.8
e2,knows,user_001,user_003,2021,0.6
e3,follows,user_002,user_001,2022,0.9

2.2 SPARQL N-Quads格式 #

nquads
<http://example.org/user_001> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/User> .
<http://example.org/user_001> <http://xmlns.com/foaf/0.1/name> "Tom" .
<http://example.org/user_001> <http://example.org/age> "30"^^<http://www.w3.org/2001/XMLSchema#integer> .
<http://example.org/user_001> <http://example.org/knows> <http://example.org/user_002> .

2.3 数据上传到S3 #

bash
# 上传数据到S3
aws s3 cp vertices.csv s3://my-bucket/neptune-data/
aws s3 cp edges.csv s3://my-bucket/neptune-data/

# 设置权限
aws s3api put-object-acl --bucket my-bucket --key neptune-data/vertices.csv --acl bucket-owner-full-control

三、加载命令 #

3.1 使用curl加载 #

bash
curl -X POST \
  -H 'Content-Type: application/json' \
  https://your-cluster-endpoint:8182/loader \
  -d '{
    "source": "s3://my-bucket/neptune-data/",
    "format": "csv",
    "region": "us-east-1",
    "failOnError": "FALSE",
    "parallelism": "MEDIUM",
    "updateSingleCardinalityProperties": "FALSE",
    "queueRequest": "TRUE"
  }'

3.2 使用AWS CLI加载 #

bash
aws neptune start-loader-job \
  --source s3://my-bucket/neptune-data/ \
  --format csv \
  --region us-east-1 \
  --db-cluster-identifier my-neptune-cluster \
  --loader-job-name my-load-job

3.3 加载参数说明 #

json
{
  "source": "s3://bucket/path/",
  "format": "csv|nquads|rdfxml|turtle",
  "region": "us-east-1",
  "failOnError": "TRUE|FALSE",
  "parallelism": "LOW|MEDIUM|HIGH|OVERSUBSCRIBE",
  "updateSingleCardinalityProperties": "TRUE|FALSE",
  "queueRequest": "TRUE|FALSE",
  "dependencies": ["job-id-1", "job-id-2"]
}

四、监控加载任务 #

4.1 查看加载状态 #

bash
# 查看所有加载任务
curl -X GET https://your-cluster-endpoint:8182/loader

# 查看特定任务
curl -X GET https://your-cluster-endpoint:8182/loader/job-id

4.2 状态说明 #

text
加载状态:
├── LOAD_IN_PROGRESS:正在加载
├── LOAD_COMPLETED:加载完成
├── LOAD_CANCELLED:已取消
├── LOAD_FAILED:加载失败
├── LOAD_UNEXPECTED_ERROR:意外错误
└── LOAD_DATA_ERROR:数据错误

4.3 查看错误日志 #

bash
# 获取错误详情
curl -X GET https://your-cluster-endpoint:8182/loader/job-id?details=true

五、数据格式详解 #

5.1 Gremlin CSV格式 #

text
列说明:
├── ~id:顶点或边的ID(必需)
├── ~label:顶点或边的标签(必需)
├── ~from:边的起始顶点ID(边必需)
├── ~to:边的结束顶点ID(边必需)
└── 其他列:属性(格式:name:Type)

支持的类型:

text
数据类型:
├── String(默认)
├── Int
├── Long
├── Float
├── Double
├── Boolean
└── Date

多值属性:

csv
~id,~label,name:String,tags:String[]
user_001,user,Tom,"tag1;tag2;tag3"

5.2 SPARQL格式 #

N-Triples:

ntriples
<subject> <predicate> <object> .

N-Quads(带图):

nquads
<subject> <predicate> <object> <graph> .

Turtle:

turtle
@prefix ex: <http://example.org/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .

ex:user_001 a ex:User ;
  foaf:name "Tom" ;
  ex:age 30 .

六、增量加载 #

6.1 增量更新 #

json
{
  "source": "s3://my-bucket/incremental-data/",
  "format": "csv",
  "updateSingleCardinalityProperties": "TRUE"
}

6.2 处理重复数据 #

text
重复数据处理:
├── 忽略:跳过重复数据
├── 更新:更新现有数据
└── 报错:加载失败

七、性能优化 #

7.1 并行度设置 #

text
并行度选项:
├── LOW:少量并行,适合小数据集
├── MEDIUM:中等并行,适合中等数据集
├── HIGH:高并行,适合大数据集
└── OVERSUBSCRIBE:超高并行,适合超大集群

7.2 文件大小建议 #

text
文件大小建议:
├── 单文件:1MB-1GB
├── 避免过小文件
├── 避免过大文件
└── 合理分片

7.3 批量加载最佳实践 #

text
最佳实践:
├── 使用合适的数据格式
├── 合理设置并行度
├── 监控加载进度
├── 处理加载错误
└── 验证加载结果

八、错误处理 #

8.1 常见错误 #

text
常见错误:
├── 格式错误:数据格式不正确
├── 类型错误:数据类型不匹配
├── 引用错误:引用不存在的顶点
├── 权限错误:S3访问权限问题
└── 网络错误:连接问题

8.2 错误恢复 #

bash
# 取消加载任务
curl -X DELETE https://your-cluster-endpoint:8182/loader/job-id

# 重新加载
curl -X POST https://your-cluster-endpoint:8182/loader -d '...'

九、实际应用示例 #

9.1 加载社交网络数据 #

bash
# 准备数据文件
# users.csv
~id,~label,name:String,email:String
user_001,user,Tom,tom@example.com
user_002,user,Jerry,jerry@example.com

# follows.csv
~id,~label,~from,~to,since:Int
e1,follows,user_001,user_002,2020

# 加载数据
curl -X POST \
  -H 'Content-Type: application/json' \
  https://endpoint:8182/loader \
  -d '{
    "source": "s3://my-bucket/social-data/",
    "format": "csv",
    "region": "us-east-1"
  }'

9.2 加载知识图谱数据 #

bash
# 准备RDF数据
# knowledge.ttl
@prefix ex: <http://example.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

ex:Database a ex:Concept ;
  rdfs:label "Database" .

ex:GraphDatabase a ex:Concept ;
  rdfs:label "Graph Database" ;
  rdfs:subClassOf ex:Database .

# 加载数据
curl -X POST \
  -H 'Content-Type: application/json' \
  https://endpoint:8182/loader \
  -d '{
    "source": "s3://my-bucket/knowledge-data/",
    "format": "turtle",
    "region": "us-east-1"
  }'

十、总结 #

批量加载要点:

项目 说明
数据格式 CSV, N-Quads, Turtle等
数据源 S3存储桶
加载方式 Loader API
监控 查看加载状态
错误处理 日志和重试

最佳实践:

  1. 使用合适的数据格式
  2. 合理设置并行度
  3. 监控加载进度
  4. 处理加载错误
  5. 验证加载结果

下一步,让我们学习流式处理!

最后更新:2026-03-27