DSPy 高级主题 #

异步支持 #

DSPy 支持异步操作,可以提高并发性能。

异步模块 #

python
import dspy
import asyncio

lm = dspy.LM('openai/gpt-4o-mini')
dspy.configure(lm=lm)

class AsyncQA(dspy.Signature):
    """回答问题"""
    question = dspy.InputField()
    answer = dspy.OutputField()

async def async_predict(question):
    qa = dspy.Predict(AsyncQA)
    return await qa.acall(question=question)

async def main():
    result = await async_predict("什么是 Python?")
    print(result.answer)

asyncio.run(main())

并发处理 #

python
import dspy
import asyncio

async def process_batch(questions):
    qa = dspy.Predict("question -> answer")
    tasks = [qa.acall(question=q) for q in questions]
    results = await asyncio.gather(*tasks)
    return [r.answer for r in results]

async def main():
    questions = [
        "什么是 Python?",
        "什么是 JavaScript?",
        "什么是 Go?"
    ]
    answers = await process_batch(questions)
    for q, a in zip(questions, answers):
        print(f"Q: {q}\nA: {a}\n")

asyncio.run(main())

异步 RAG #

python
import dspy
import asyncio

class AsyncRAG(dspy.Module):
    def __init__(self, k=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=k)
        self.generate = dspy.ChainOfThought("context, question -> answer")
    
    async def aforward(self, question):
        context = await self.retrieve.acall(question)
        return await self.generate.acall(
            context=context.passages,
            question=question
        )

async def main():
    rag = AsyncRAG()
    result = await rag.aforward("什么是机器学习?")
    print(result.answer)

asyncio.run(main())

流式响应 #

DSPy 支持流式响应,可以实时获取生成内容。

基本流式 #

python
import dspy

lm = dspy.LM('openai/gpt-4o-mini')
dspy.configure(lm=lm)

qa = dspy.Predict("question -> answer")

for chunk in qa.stream(question="请详细解释量子计算"):
    print(chunk, end="", flush=True)

流式 RAG #

python
import dspy

class StreamRAG(dspy.Module):
    def __init__(self, k=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=k)
        self.generate = dspy.ChainOfThought("context, question -> answer")
    
    def stream_forward(self, question):
        context = self.retrieve(question).passages
        return self.generate.stream(context=context, question=question)

rag = StreamRAG()

for chunk in rag.stream_forward("什么是深度学习?"):
    print(chunk, end="", flush=True)

异步流式 #

python
import dspy
import asyncio

async def stream_response():
    qa = dspy.Predict("question -> answer")
    async for chunk in qa.astream(question="请解释神经网络"):
        print(chunk, end="", flush=True)

asyncio.run(stream_response())

工具调用 #

DSPy 支持工具调用,可以让 LLM 执行外部操作。

定义工具 #

python
import dspy

def calculator(expression: str) -> float:
    """计算数学表达式"""
    try:
        return eval(expression)
    except Exception as e:
        return f"Error: {e}"

def search(query: str) -> str:
    """搜索信息"""
    return f"搜索结果: {query}"

def get_weather(city: str) -> str:
    """获取天气"""
    return f"{city} 天气: 晴,25°C"

tools = [calculator, search, get_weather]

使用 ReAct #

python
import dspy

class ToolQA(dspy.Signature):
    """使用工具回答问题"""
    question = dspy.InputField()
    answer = dspy.OutputField()

react = dspy.ReAct(ToolQA, tools=tools)

result = react(question="北京今天天气怎么样?")
print(result.answer)

自定义工具调用 #

python
import dspy
import json

class ToolUsingModule(dspy.Module):
    def __init__(self, tools):
        super().__init__()
        self.tools = {t.__name__: t for t in tools}
        self.plan = dspy.ChainOfThought("question -> tool_calls")
        self.execute = dspy.Predict("tool_calls, results -> answer")
    
    def forward(self, question):
        plan = self.plan(question=question)
        tool_calls = json.loads(plan.tool_calls)
        
        results = {}
        for call in tool_calls:
            tool_name = call['tool']
            args = call['args']
            results[tool_name] = self.tools[tool_name](**args)
        
        return self.execute(
            tool_calls=plan.tool_calls,
            results=json.dumps(results)
        )

多模态支持 #

DSPy 支持多模态输入,包括图像。

图像输入 #

python
import dspy
from PIL import Image

lm = dspy.LM('openai/gpt-4o')
dspy.configure(lm=lm)

class ImageAnalysis(dspy.Signature):
    """分析图像内容"""
    image = dspy.InputField(desc="图像")
    description = dspy.OutputField(desc="图像描述")
    objects = dspy.OutputField(desc="识别的对象列表")

analyzer = dspy.Predict(ImageAnalysis)

image = Image.open("example.jpg")
result = analyzer(image=image)
print(result.description)
print(result.objects)

图像问答 #

python
import dspy

class ImageQA(dspy.Signature):
    """根据图像回答问题"""
    image = dspy.InputField()
    question = dspy.InputField()
    answer = dspy.OutputField()

qa = dspy.ChainOfThought(ImageQA)

result = qa(
    image=Image.open("chart.png"),
    question="图表中哪个季度销售额最高?"
)
print(result.answer)

多图像处理 #

python
import dspy

class CompareImages(dspy.Signature):
    """比较两张图像"""
    image1 = dspy.InputField()
    image2 = dspy.InputField()
    differences = dspy.OutputField(desc="两张图像的差异")

comparator = dspy.Predict(CompareImages)

result = comparator(
    image1=Image.open("before.png"),
    image2=Image.open("after.png")
)
print(result.differences)

自定义语言模型 #

自定义 LM 类 #

python
import dspy
import requests

class CustomLM(dspy.LM):
    def __init__(self, model, api_base, api_key=None):
        super().__init__(model)
        self.api_base = api_base
        self.api_key = api_key
    
    def __call__(self, prompt, **kwargs):
        response = requests.post(
            f"{self.api_base}/generate",
            json={
                "prompt": prompt,
                "model": self.model,
                **kwargs
            },
            headers={"Authorization": f"Bearer {self.api_key}"}
        )
        return response.json()["text"]
    
    def inspect_history(self, n):
        return self.history[-n:]

lm = CustomLM("my-model", "http://localhost:8000")
dspy.configure(lm=lm)

本地模型集成 #

python
import dspy
from transformers import AutoModelForCausalLM, AutoTokenizer

class LocalLM(dspy.LM):
    def __init__(self, model_path):
        super().__init__(model_path)
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.model = AutoModelForCausalLM.from_pretrained(model_path)
    
    def __call__(self, prompt, **kwargs):
        inputs = self.tokenizer(prompt, return_tensors="pt")
        outputs = self.model.generate(**inputs, max_length=512)
        return self.tokenizer.decode(outputs[0])

lm = LocalLM("./local-model")
dspy.configure(lm=lm)

自定义检索器 #

自定义 RM 类 #

python
import dspy

class CustomRM(dspy.Retrieve):
    def __init__(self, documents, embed_func, k=3):
        super().__init__(k=k)
        self.documents = documents
        self.embed_func = embed_func
        self.embeddings = [embed_func(doc) for doc in documents]
    
    def forward(self, query):
        query_embedding = self.embed_func(query)
        scores = [
            self._cosine_similarity(query_embedding, doc_emb)
            for doc_emb in self.embeddings
        ]
        top_indices = sorted(
            range(len(scores)),
            key=lambda i: scores[i],
            reverse=True
        )[:self.k]
        
        passages = [self.documents[i] for i in top_indices]
        return dspy.Prediction(passages=passages)
    
    def _cosine_similarity(self, a, b):
        import numpy as np
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

rm = CustomRM(documents, embed_func, k=3)
dspy.configure(rm=rm)

评估框架 #

自定义评估器 #

python
import dspy
from dspy.evaluate import Evaluate

class DetailedEvaluator:
    def __init__(self, metrics):
        self.metrics = metrics
    
    def evaluate(self, module, devset):
        results = {m.__name__: [] for m in self.metrics}
        
        for example in devset:
            pred = module(**example.inputs())
            for metric in self.metrics:
                score = metric(example, pred)
                results[metric.__name__].append(score)
        
        return {
            name: sum(scores) / len(scores)
            for name, scores in results.items()
        }

def accuracy(example, pred):
    return example.answer.lower() == pred.answer.lower()

def relevance(example, pred):
    return compute_relevance(example.question, pred.answer)

evaluator = DetailedEvaluator([accuracy, relevance])
scores = evaluator.evaluate(module, testset)

交叉验证 #

python
import dspy
from sklearn.model_selection import KFold

def cross_validate(module_class, dataset, n_splits=5):
    kf = KFold(n_splits=n_splits)
    scores = []
    
    for train_idx, val_idx in kf.split(dataset):
        trainset = [dataset[i] for i in train_idx]
        valset = [dataset[i] for i in val_idx]
        
        module = module_class()
        optimizer = BootstrapFewShot(metric=accuracy, max_bootstrapped_demos=3)
        optimized = optimizer.compile(module, trainset=trainset)
        
        evaluator = Evaluate(devset=valset, metric=accuracy)
        score = evaluator(optimized)
        scores.append(score)
    
    return {
        'mean': sum(scores) / len(scores),
        'std': (sum((s - sum(scores)/len(scores))**2 for s in scores) / len(scores)) ** 0.5,
        'scores': scores
    }

高级优化技术 #

渐进式优化 #

python
import dspy
from dspy.teleprompt import BootstrapFewShot, MIPRO

def progressive_optimization(module, trainset, valset):
    optimizer1 = BootstrapFewShot(
        metric=accuracy,
        max_bootstrapped_demos=2
    )
    module = optimizer1.compile(module, trainset=trainset)
    
    optimizer2 = BootstrapFewShot(
        metric=accuracy,
        max_bootstrapped_demos=4
    )
    module = optimizer2.compile(module, trainset=trainset)
    
    optimizer3 = MIPRO(
        metric=accuracy,
        max_rounds=2
    )
    module = optimizer3.compile(module, trainset=trainset, valset=valset)
    
    return module

集成优化 #

python
import dspy
from dspy.teleprompt import BootstrapFewShot, Ensemble

def ensemble_optimization(module_class, trainset, n_models=5):
    models = []
    
    for i in range(n_models):
        module = module_class()
        optimizer = BootstrapFewShot(
            metric=accuracy,
            max_bootstrapped_demos=3
        )
        optimized = optimizer.compile(module, trainset=trainset)
        models.append(optimized)
    
    return Ensemble(models)

错误处理与重试 #

自动重试 #

python
import dspy
from tenacity import retry, stop_after_attempt, wait_exponential

class RobustModule(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predict = dspy.Predict("question -> answer")
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
    def forward(self, question):
        try:
            return self.predict(question=question)
        except Exception as e:
            print(f"Error: {e}, retrying...")
            raise

降级处理 #

python
import dspy

class FallbackModule(dspy.Module):
    def __init__(self):
        super().__init__()
        self.primary = dspy.ChainOfThought("question -> answer")
        self.fallback = dspy.Predict("question -> answer")
    
    def forward(self, question):
        try:
            return self.primary(question=question)
        except Exception as e:
            print(f"Primary failed: {e}, using fallback")
            return self.fallback(question=question)

性能优化 #

缓存策略 #

python
import dspy
from functools import lru_cache

class CachedModule(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predict = dspy.Predict("question -> answer")
        self._cached_predict = lru_cache(maxsize=1000)(self._predict_impl)
    
    def forward(self, question):
        return self._cached_predict(question)
    
    def _predict_impl(self, question):
        return self.predict(question=question)

批处理 #

python
import dspy

class BatchModule(dspy.Module):
    def __init__(self, batch_size=10):
        super().__init__()
        self.predict = dspy.Predict("questions -> answers")
        self.batch_size = batch_size
    
    def forward(self, questions):
        results = []
        for i in range(0, len(questions), self.batch_size):
            batch = questions[i:i + self.batch_size]
            batch_result = self.predict(questions=batch)
            results.extend(batch_result.answers)
        return dspy.Prediction(answers=results)

调试与监控 #

详细日志 #

python
import dspy
import logging

logging.basicConfig(level=logging.DEBUG)
dspy.settings.configure(trace=True)

module = dspy.Predict("question -> answer")
result = module(question="测试")

dspy.inspect_trace()

性能分析 #

python
import dspy
import time

class ProfiledModule(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predict = dspy.Predict("question -> answer")
        self.call_times = []
    
    def forward(self, question):
        start = time.time()
        result = self.predict(question=question)
        self.call_times.append(time.time() - start)
        return result
    
    def get_stats(self):
        return {
            'total_calls': len(self.call_times),
            'avg_time': sum(self.call_times) / len(self.call_times),
            'max_time': max(self.call_times),
            'min_time': min(self.call_times)
        }

下一步 #

现在你已经掌握了 DSPy 的高级功能,接下来学习 最佳实践,了解如何在生产环境中使用 DSPy!

最后更新:2026-03-30