DSPy 高级主题 #
异步支持 #
DSPy 支持异步操作,可以提高并发性能。
异步模块 #
python
import dspy
import asyncio
lm = dspy.LM('openai/gpt-4o-mini')
dspy.configure(lm=lm)
class AsyncQA(dspy.Signature):
"""回答问题"""
question = dspy.InputField()
answer = dspy.OutputField()
async def async_predict(question):
qa = dspy.Predict(AsyncQA)
return await qa.acall(question=question)
async def main():
result = await async_predict("什么是 Python?")
print(result.answer)
asyncio.run(main())
并发处理 #
python
import dspy
import asyncio
async def process_batch(questions):
qa = dspy.Predict("question -> answer")
tasks = [qa.acall(question=q) for q in questions]
results = await asyncio.gather(*tasks)
return [r.answer for r in results]
async def main():
questions = [
"什么是 Python?",
"什么是 JavaScript?",
"什么是 Go?"
]
answers = await process_batch(questions)
for q, a in zip(questions, answers):
print(f"Q: {q}\nA: {a}\n")
asyncio.run(main())
异步 RAG #
python
import dspy
import asyncio
class AsyncRAG(dspy.Module):
def __init__(self, k=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=k)
self.generate = dspy.ChainOfThought("context, question -> answer")
async def aforward(self, question):
context = await self.retrieve.acall(question)
return await self.generate.acall(
context=context.passages,
question=question
)
async def main():
rag = AsyncRAG()
result = await rag.aforward("什么是机器学习?")
print(result.answer)
asyncio.run(main())
流式响应 #
DSPy 支持流式响应,可以实时获取生成内容。
基本流式 #
python
import dspy
lm = dspy.LM('openai/gpt-4o-mini')
dspy.configure(lm=lm)
qa = dspy.Predict("question -> answer")
for chunk in qa.stream(question="请详细解释量子计算"):
print(chunk, end="", flush=True)
流式 RAG #
python
import dspy
class StreamRAG(dspy.Module):
def __init__(self, k=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=k)
self.generate = dspy.ChainOfThought("context, question -> answer")
def stream_forward(self, question):
context = self.retrieve(question).passages
return self.generate.stream(context=context, question=question)
rag = StreamRAG()
for chunk in rag.stream_forward("什么是深度学习?"):
print(chunk, end="", flush=True)
异步流式 #
python
import dspy
import asyncio
async def stream_response():
qa = dspy.Predict("question -> answer")
async for chunk in qa.astream(question="请解释神经网络"):
print(chunk, end="", flush=True)
asyncio.run(stream_response())
工具调用 #
DSPy 支持工具调用,可以让 LLM 执行外部操作。
定义工具 #
python
import dspy
def calculator(expression: str) -> float:
"""计算数学表达式"""
try:
return eval(expression)
except Exception as e:
return f"Error: {e}"
def search(query: str) -> str:
"""搜索信息"""
return f"搜索结果: {query}"
def get_weather(city: str) -> str:
"""获取天气"""
return f"{city} 天气: 晴,25°C"
tools = [calculator, search, get_weather]
使用 ReAct #
python
import dspy
class ToolQA(dspy.Signature):
"""使用工具回答问题"""
question = dspy.InputField()
answer = dspy.OutputField()
react = dspy.ReAct(ToolQA, tools=tools)
result = react(question="北京今天天气怎么样?")
print(result.answer)
自定义工具调用 #
python
import dspy
import json
class ToolUsingModule(dspy.Module):
def __init__(self, tools):
super().__init__()
self.tools = {t.__name__: t for t in tools}
self.plan = dspy.ChainOfThought("question -> tool_calls")
self.execute = dspy.Predict("tool_calls, results -> answer")
def forward(self, question):
plan = self.plan(question=question)
tool_calls = json.loads(plan.tool_calls)
results = {}
for call in tool_calls:
tool_name = call['tool']
args = call['args']
results[tool_name] = self.tools[tool_name](**args)
return self.execute(
tool_calls=plan.tool_calls,
results=json.dumps(results)
)
多模态支持 #
DSPy 支持多模态输入,包括图像。
图像输入 #
python
import dspy
from PIL import Image
lm = dspy.LM('openai/gpt-4o')
dspy.configure(lm=lm)
class ImageAnalysis(dspy.Signature):
"""分析图像内容"""
image = dspy.InputField(desc="图像")
description = dspy.OutputField(desc="图像描述")
objects = dspy.OutputField(desc="识别的对象列表")
analyzer = dspy.Predict(ImageAnalysis)
image = Image.open("example.jpg")
result = analyzer(image=image)
print(result.description)
print(result.objects)
图像问答 #
python
import dspy
class ImageQA(dspy.Signature):
"""根据图像回答问题"""
image = dspy.InputField()
question = dspy.InputField()
answer = dspy.OutputField()
qa = dspy.ChainOfThought(ImageQA)
result = qa(
image=Image.open("chart.png"),
question="图表中哪个季度销售额最高?"
)
print(result.answer)
多图像处理 #
python
import dspy
class CompareImages(dspy.Signature):
"""比较两张图像"""
image1 = dspy.InputField()
image2 = dspy.InputField()
differences = dspy.OutputField(desc="两张图像的差异")
comparator = dspy.Predict(CompareImages)
result = comparator(
image1=Image.open("before.png"),
image2=Image.open("after.png")
)
print(result.differences)
自定义语言模型 #
自定义 LM 类 #
python
import dspy
import requests
class CustomLM(dspy.LM):
def __init__(self, model, api_base, api_key=None):
super().__init__(model)
self.api_base = api_base
self.api_key = api_key
def __call__(self, prompt, **kwargs):
response = requests.post(
f"{self.api_base}/generate",
json={
"prompt": prompt,
"model": self.model,
**kwargs
},
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()["text"]
def inspect_history(self, n):
return self.history[-n:]
lm = CustomLM("my-model", "http://localhost:8000")
dspy.configure(lm=lm)
本地模型集成 #
python
import dspy
from transformers import AutoModelForCausalLM, AutoTokenizer
class LocalLM(dspy.LM):
def __init__(self, model_path):
super().__init__(model_path)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model = AutoModelForCausalLM.from_pretrained(model_path)
def __call__(self, prompt, **kwargs):
inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.model.generate(**inputs, max_length=512)
return self.tokenizer.decode(outputs[0])
lm = LocalLM("./local-model")
dspy.configure(lm=lm)
自定义检索器 #
自定义 RM 类 #
python
import dspy
class CustomRM(dspy.Retrieve):
def __init__(self, documents, embed_func, k=3):
super().__init__(k=k)
self.documents = documents
self.embed_func = embed_func
self.embeddings = [embed_func(doc) for doc in documents]
def forward(self, query):
query_embedding = self.embed_func(query)
scores = [
self._cosine_similarity(query_embedding, doc_emb)
for doc_emb in self.embeddings
]
top_indices = sorted(
range(len(scores)),
key=lambda i: scores[i],
reverse=True
)[:self.k]
passages = [self.documents[i] for i in top_indices]
return dspy.Prediction(passages=passages)
def _cosine_similarity(self, a, b):
import numpy as np
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
rm = CustomRM(documents, embed_func, k=3)
dspy.configure(rm=rm)
评估框架 #
自定义评估器 #
python
import dspy
from dspy.evaluate import Evaluate
class DetailedEvaluator:
def __init__(self, metrics):
self.metrics = metrics
def evaluate(self, module, devset):
results = {m.__name__: [] for m in self.metrics}
for example in devset:
pred = module(**example.inputs())
for metric in self.metrics:
score = metric(example, pred)
results[metric.__name__].append(score)
return {
name: sum(scores) / len(scores)
for name, scores in results.items()
}
def accuracy(example, pred):
return example.answer.lower() == pred.answer.lower()
def relevance(example, pred):
return compute_relevance(example.question, pred.answer)
evaluator = DetailedEvaluator([accuracy, relevance])
scores = evaluator.evaluate(module, testset)
交叉验证 #
python
import dspy
from sklearn.model_selection import KFold
def cross_validate(module_class, dataset, n_splits=5):
kf = KFold(n_splits=n_splits)
scores = []
for train_idx, val_idx in kf.split(dataset):
trainset = [dataset[i] for i in train_idx]
valset = [dataset[i] for i in val_idx]
module = module_class()
optimizer = BootstrapFewShot(metric=accuracy, max_bootstrapped_demos=3)
optimized = optimizer.compile(module, trainset=trainset)
evaluator = Evaluate(devset=valset, metric=accuracy)
score = evaluator(optimized)
scores.append(score)
return {
'mean': sum(scores) / len(scores),
'std': (sum((s - sum(scores)/len(scores))**2 for s in scores) / len(scores)) ** 0.5,
'scores': scores
}
高级优化技术 #
渐进式优化 #
python
import dspy
from dspy.teleprompt import BootstrapFewShot, MIPRO
def progressive_optimization(module, trainset, valset):
optimizer1 = BootstrapFewShot(
metric=accuracy,
max_bootstrapped_demos=2
)
module = optimizer1.compile(module, trainset=trainset)
optimizer2 = BootstrapFewShot(
metric=accuracy,
max_bootstrapped_demos=4
)
module = optimizer2.compile(module, trainset=trainset)
optimizer3 = MIPRO(
metric=accuracy,
max_rounds=2
)
module = optimizer3.compile(module, trainset=trainset, valset=valset)
return module
集成优化 #
python
import dspy
from dspy.teleprompt import BootstrapFewShot, Ensemble
def ensemble_optimization(module_class, trainset, n_models=5):
models = []
for i in range(n_models):
module = module_class()
optimizer = BootstrapFewShot(
metric=accuracy,
max_bootstrapped_demos=3
)
optimized = optimizer.compile(module, trainset=trainset)
models.append(optimized)
return Ensemble(models)
错误处理与重试 #
自动重试 #
python
import dspy
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustModule(dspy.Module):
def __init__(self):
super().__init__()
self.predict = dspy.Predict("question -> answer")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def forward(self, question):
try:
return self.predict(question=question)
except Exception as e:
print(f"Error: {e}, retrying...")
raise
降级处理 #
python
import dspy
class FallbackModule(dspy.Module):
def __init__(self):
super().__init__()
self.primary = dspy.ChainOfThought("question -> answer")
self.fallback = dspy.Predict("question -> answer")
def forward(self, question):
try:
return self.primary(question=question)
except Exception as e:
print(f"Primary failed: {e}, using fallback")
return self.fallback(question=question)
性能优化 #
缓存策略 #
python
import dspy
from functools import lru_cache
class CachedModule(dspy.Module):
def __init__(self):
super().__init__()
self.predict = dspy.Predict("question -> answer")
self._cached_predict = lru_cache(maxsize=1000)(self._predict_impl)
def forward(self, question):
return self._cached_predict(question)
def _predict_impl(self, question):
return self.predict(question=question)
批处理 #
python
import dspy
class BatchModule(dspy.Module):
def __init__(self, batch_size=10):
super().__init__()
self.predict = dspy.Predict("questions -> answers")
self.batch_size = batch_size
def forward(self, questions):
results = []
for i in range(0, len(questions), self.batch_size):
batch = questions[i:i + self.batch_size]
batch_result = self.predict(questions=batch)
results.extend(batch_result.answers)
return dspy.Prediction(answers=results)
调试与监控 #
详细日志 #
python
import dspy
import logging
logging.basicConfig(level=logging.DEBUG)
dspy.settings.configure(trace=True)
module = dspy.Predict("question -> answer")
result = module(question="测试")
dspy.inspect_trace()
性能分析 #
python
import dspy
import time
class ProfiledModule(dspy.Module):
def __init__(self):
super().__init__()
self.predict = dspy.Predict("question -> answer")
self.call_times = []
def forward(self, question):
start = time.time()
result = self.predict(question=question)
self.call_times.append(time.time() - start)
return result
def get_stats(self):
return {
'total_calls': len(self.call_times),
'avg_time': sum(self.call_times) / len(self.call_times),
'max_time': max(self.call_times),
'min_time': min(self.call_times)
}
下一步 #
现在你已经掌握了 DSPy 的高级功能,接下来学习 最佳实践,了解如何在生产环境中使用 DSPy!
最后更新:2026-03-30