流式输出 #

概述 #

流式输出允许 LLM 逐步返回生成的内容,而不是等待完整响应,提供更好的用户体验。

基本使用 #

流式提示词 #

csharp
await foreach (var chunk in kernel.InvokePromptStreamingAsync("写一首诗"))
{
    Console.Write(chunk);
}

流式函数调用 #

csharp
var function = kernel.Plugins["Text"]["WriteStory"];

await foreach (var chunk in kernel.InvokeStreamingAsync(function, arguments))
{
    Console.Write(chunk);
}

处理流式内容 #

获取完整内容 #

csharp
var fullContent = new StringBuilder();

await foreach (var chunk in kernel.InvokePromptStreamingAsync("写一篇文章"))
{
    Console.Write(chunk);
    fullContent.Append(chunk.ToString());
}

Console.WriteLine("\n--- 完整内容 ---");
Console.WriteLine(fullContent.ToString());

类型化流式内容 #

csharp
await foreach (var content in kernel.InvokePromptStreamingAsync<StreamingChatMessageContent>("写故事"))
{
    if (content.Content != null)
    {
        Console.Write(content.Content);
    }
    
    if (content.FunctionCall != null)
    {
        Console.WriteLine($"\n函数调用: {content.FunctionCall.Name}");
    }
}

流式聊天 #

流式对话 #

csharp
using Microsoft.SemanticKernel.ChatCompletion;

var chatService = kernel.GetRequiredService<IChatCompletionService>();
var chatHistory = new ChatHistory();

chatHistory.AddUserMessage("讲一个故事");

await foreach (var chunk in chatService.GetStreamingChatMessageContentsAsync(chatHistory))
{
    Console.Write(chunk.Content);
}

完整流式聊天示例 #

csharp
public class StreamingChatService
{
    private readonly IChatCompletionService _chatService;
    private readonly ChatHistory _history;

    public StreamingChatService(Kernel kernel)
    {
        _chatService = kernel.GetRequiredService<IChatCompletionService>();
        _history = new ChatHistory();
        _history.AddSystemMessage("你是一个友好的助手。");
    }

    public async Task ChatAsync(string userMessage)
    {
        _history.AddUserMessage(userMessage);

        Console.Write("助手: ");
        var fullResponse = new StringBuilder();

        await foreach (var chunk in _chatService.GetStreamingChatMessageContentsAsync(_history))
        {
            if (chunk.Content != null)
            {
                Console.Write(chunk.Content);
                fullResponse.Append(chunk.Content);
            }
        }

        Console.WriteLine();
        _history.AddAssistantMessage(fullResponse.ToString());
    }
}

ASP.NET Core 集成 #

SSE 端点 #

csharp
[ApiController]
[Route("api/[controller]")]
public class ChatController : ControllerBase
{
    private readonly Kernel _kernel;

    public ChatController(Kernel kernel)
    {
        _kernel = kernel;
    }

    [HttpPost("stream")]
    public async Task StreamChat([FromBody] ChatRequest request)
    {
        Response.Headers.Append("Content-Type", "text/event-stream");
        Response.Headers.Append("Cache-Control", "no-cache");
        Response.Headers.Append("Connection", "keep-alive");

        await foreach (var chunk in _kernel.InvokePromptStreamingAsync(request.Message))
        {
            var data = $"data: {chunk}\n\n";
            await Response.WriteAsync(data);
            await Response.Body.FlushAsync();
        }

        await Response.WriteAsync("data: [DONE]\n\n");
    }
}

前端调用 #

javascript
const eventSource = new EventSource('/api/chat/stream?message=你好');

eventSource.onmessage = (event) => {
    if (event.data === '[DONE]') {
        eventSource.close();
        return;
    }
    document.getElementById('response').textContent += event.data;
};

eventSource.onerror = () => {
    eventSource.close();
};

SignalR 集成 #

Hub 实现 #

csharp
public class ChatHub : Hub
{
    private readonly Kernel _kernel;

    public ChatHub(Kernel kernel)
    {
        _kernel = kernel;
    }

    public async IAsyncEnumerable<string> StreamChat(
        string message,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await foreach (var chunk in _kernel.InvokePromptStreamingAsync(
            message,
            cancellationToken: cancellationToken))
        {
            yield return chunk.ToString();
        }
    }
}

客户端调用 #

javascript
const connection = new signalR.HubConnectionBuilder()
    .withUrl("/chathub")
    .build();

connection.on("ReceiveMessage", (message) => {
    document.getElementById('response').textContent += message;
});

await connection.start();
await connection.stream("StreamChat", "你好").subscribe({
    next: (item) => console.log(item),
    complete: () => console.log("完成"),
    error: (err) => console.error(err)
});

高级用法 #

带进度的流式输出 #

csharp
public class ProgressStreamingService
{
    public async Task StreamWithProgressAsync(
        string prompt,
        IProgress<string> progress,
        CancellationToken cancellationToken = default)
    {
        var totalLength = 0;

        await foreach (var chunk in kernel.InvokePromptStreamingAsync(
            prompt,
            cancellationToken: cancellationToken))
        {
            var content = chunk.ToString();
            totalLength += content.Length;
            
            progress.Report(content);
            
            Console.WriteLine($"已接收: {totalLength} 字符");
        }
    }
}

并行流式处理 #

csharp
public async Task StreamMultipleAsync(string[] prompts)
{
    var tasks = prompts.Select(async prompt =>
    {
        var result = new StringBuilder();
        await foreach (var chunk in kernel.InvokePromptStreamingAsync(prompt))
        {
            result.Append(chunk);
        }
        return result.ToString();
    });

    var results = await Task.WhenAll(tasks);
    
    foreach (var (prompt, result) in prompts.Zip(results))
    {
        Console.WriteLine($"提示: {prompt}");
        Console.WriteLine($"结果: {result}\n");
    }
}

最佳实践 #

1. 处理取消 #

csharp
using var cts = new CancellationTokenSource();

// 用户取消时
cts.Cancel();

try
{
    await foreach (var chunk in kernel.InvokePromptStreamingAsync(
        prompt,
        cancellationToken: cts.Token))
    {
        Console.Write(chunk);
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("\n已取消");
}

2. 超时处理 #

csharp
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

try
{
    await foreach (var chunk in kernel.InvokePromptStreamingAsync(
        prompt,
        cancellationToken: cts.Token))
    {
        Console.Write(chunk);
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("\n超时");
}

3. 错误恢复 #

csharp
public async Task StreamWithRetryAsync(string prompt, int maxRetries = 3)
{
    for (int i = 0; i < maxRetries; i++)
    {
        try
        {
            await foreach (var chunk in kernel.InvokePromptStreamingAsync(prompt))
            {
                Console.Write(chunk);
            }
            return;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"\n尝试 {i + 1} 失败: {ex.Message}");
            if (i == maxRetries - 1) throw;
            await Task.Delay(1000 * (i + 1));
        }
    }
}

下一步 #

现在你已经掌握了流式输出,接下来学习 函数调用,了解如何让 LLM 调用你的函数!

最后更新:2026-04-04