工作流自动化 #

项目概述 #

本节将构建一个智能工作流自动化系统,具备以下功能:

  • 可视化流程定义
  • 条件分支与循环
  • 任务调度与执行
  • 错误处理与重试
  • 执行状态监控

工作流架构 #

text
┌─────────────────────────────────────────────────────────────┐
│                    工作流架构                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐   │
│  │  流程定义    │────▶│  流程解析    │────▶│  流程执行    │   │
│  └─────────────┘     └─────────────┘     └─────────────┘   │
│                                                 │            │
│         ┌───────────────────────────────────────┤            │
│         │                   │                   │            │
│         ▼                   ▼                   ▼            │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐    │
│  │  任务执行    │     │  条件判断    │     │  状态管理    │    │
│  └─────────────┘     └─────────────┘     └─────────────┘    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

项目结构 #

text
WorkflowEngine/
├── Models/
│   ├── Workflow.cs
│   ├── WorkflowStep.cs
│   ├── WorkflowContext.cs
│   └── ExecutionResult.cs
├── Services/
│   ├── IWorkflowEngine.cs
│   ├── WorkflowEngine.cs
│   ├── IStepExecutor.cs
│   └── StepExecutor.cs
├── Activities/
│   ├── IActivity.cs
│   ├── LLMActivity.cs
│   ├── HttpActivity.cs
│   ├── EmailActivity.cs
│   └── DelayActivity.cs
├── Controllers/
│   └── WorkflowController.cs
└── Program.cs

核心代码实现 #

1. 工作流模型 #

csharp
public class Workflow
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string Name { get; set; } = "";
    public string Description { get; set; } = "";
    public List<WorkflowStep> Steps { get; set; } = new();
    public WorkflowTrigger Trigger { get; set; } = new();
    public WorkflowSettings Settings { get; set; } = new();
}

public class WorkflowStep
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string Name { get; set; } = "";
    public string Type { get; set; } = "";
    public Dictionary<string, object> Inputs { get; set; } = new();
    public List<WorkflowCondition> Conditions { get; set; } = new();
    public List<string> NextSteps { get; set; } = new();
    public RetryPolicy? RetryPolicy { get; set; }
}

public class WorkflowCondition
{
    public string Expression { get; set; } = "";
    public string TargetStep { get; set; } = "";
}

public class WorkflowContext
{
    public string ExecutionId { get; set; } = Guid.NewGuid().ToString();
    public string WorkflowId { get; set; } = "";
    public Dictionary<string, object> Variables { get; set; } = new();
    public Dictionary<string, object> Inputs { get; set; } = new();
    public Dictionary<string, object> Outputs { get; set; } = new();
    public List<StepExecutionLog> ExecutionLog { get; set; } = new();
    public DateTime StartTime { get; set; }
    public DateTime? EndTime { get; set; }
    public WorkflowStatus Status { get; set; }
}

public class StepExecutionLog
{
    public string StepId { get; set; } = "";
    public string StepName { get; set; } = "";
    public DateTime StartTime { get; set; }
    public DateTime? EndTime { get; set; }
    public bool Success { get; set; }
    public string? Error { get; set; }
    public Dictionary<string, object> Outputs { get; set; } = new();
}

2. 活动定义 #

csharp
public interface IActivity
{
    string Type { get; }
    Task<ActivityResult> ExecuteAsync(
        WorkflowStep step,
        WorkflowContext context,
        CancellationToken cancellationToken = default);
}

public class LLMActivity : IActivity
{
    private readonly Kernel _kernel;

    public string Type => "LLM";

    public LLMActivity(Kernel kernel)
    {
        _kernel = kernel;
    }

    public async Task<ActivityResult> ExecuteAsync(
        WorkflowStep step,
        WorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        var promptTemplate = step.Inputs["prompt"]?.ToString() ?? "";
        var prompt = ResolveVariables(promptTemplate, context);

        var result = await _kernel.InvokePromptAsync(
            prompt,
            cancellationToken: cancellationToken
        );

        return new ActivityResult
        {
            Success = true,
            Outputs = new Dictionary<string, object>
            {
                ["result"] = result.ToString()
            }
        };
    }

    private string ResolveVariables(string template, WorkflowContext context)
    {
        foreach (var variable in context.Variables)
        {
            template = template.Replace($"{{{{{variable.Key}}}}}", variable.Value?.ToString());
        }
        foreach (var input in context.Inputs)
        {
            template = template.Replace($"{{{{{input.Key}}}}}", input.Value?.ToString());
        }
        return template;
    }
}

public class HttpActivity : IActivity
{
    private readonly HttpClient _httpClient;

    public string Type => "Http";

    public async Task<ActivityResult> ExecuteAsync(
        WorkflowStep step,
        WorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        var url = step.Inputs["url"]?.ToString() ?? "";
        var method = step.Inputs.GetValueOrDefault("method", "GET")?.ToString() ?? "GET";

        HttpResponseMessage response;
        
        if (method.Equals("GET", StringComparison.OrdinalIgnoreCase))
        {
            response = await _httpClient.GetAsync(url, cancellationToken);
        }
        else
        {
            var body = step.Inputs.GetValueOrDefault("body", "")?.ToString();
            var content = new StringContent(body ?? "", Encoding.UTF8, "application/json");
            response = await _httpClient.PostAsync(url, content, cancellationToken);
        }

        var result = await response.Content.ReadAsStringAsync(cancellationToken);

        return new ActivityResult
        {
            Success = response.IsSuccessStatusCode,
            Outputs = new Dictionary<string, object>
            {
                ["statusCode"] = (int)response.StatusCode,
                ["body"] = result
            }
        };
    }
}

public class EmailActivity : IActivity
{
    private readonly IEmailService _emailService;

    public string Type => "Email";

    public EmailActivity(IEmailService emailService)
    {
        _emailService = emailService;
    }

    public async Task<ActivityResult> ExecuteAsync(
        WorkflowStep step,
        WorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        var to = step.Inputs["to"]?.ToString() ?? "";
        var subject = step.Inputs["subject"]?.ToString() ?? "";
        var body = step.Inputs["body"]?.ToString() ?? "";

        await _emailService.SendAsync(to, subject, body);

        return new ActivityResult
        {
            Success = true,
            Outputs = new Dictionary<string, object>
            {
                ["sent"] = true
            }
        };
    }
}

public class DelayActivity : IActivity
{
    public string Type => "Delay";

    public async Task<ActivityResult> ExecuteAsync(
        WorkflowStep step,
        WorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        var delayMs = Convert.ToInt32(step.Inputs.GetValueOrDefault("milliseconds", 1000));
        
        await Task.Delay(delayMs, cancellationToken);

        return new ActivityResult
        {
            Success = true,
            Outputs = new Dictionary<string, object>
            {
                ["delayed"] = delayMs
            }
        };
    }
}

3. 工作流引擎 #

csharp
public class WorkflowEngine : IWorkflowEngine
{
    private readonly Kernel _kernel;
    private readonly Dictionary<string, IActivity> _activities;
    private readonly ILogger<WorkflowEngine> _logger;

    public WorkflowEngine(
        Kernel kernel,
        IEnumerable<IActivity> activities,
        ILogger<WorkflowEngine> logger)
    {
        _kernel = kernel;
        _activities = activities.ToDictionary(a => a.Type);
        _logger = logger;
    }

    public async Task<WorkflowContext> ExecuteAsync(
        Workflow workflow,
        Dictionary<string, object>? inputs = null,
        CancellationToken cancellationToken = default)
    {
        var context = new WorkflowContext
        {
            WorkflowId = workflow.Id,
            Inputs = inputs ?? new Dictionary<string, object>(),
            StartTime = DateTime.UtcNow,
            Status = WorkflowStatus.Running
        };

        try
        {
            var executedSteps = new HashSet<string>();
            var currentSteps = GetStartSteps(workflow);

            while (currentSteps.Any())
            {
                var nextSteps = new List<WorkflowStep>();

                foreach (var step in currentSteps)
                {
                    if (executedSteps.Contains(step.Id))
                        continue;

                    var result = await ExecuteStepAsync(step, context, cancellationToken);
                    
                    if (result.Success)
                    {
                        executedSteps.Add(step.Id);
                        
                        var nextStepIds = EvaluateConditions(step, context);
                        foreach (var nextId in nextStepIds)
                        {
                            var nextStep = workflow.Steps.FirstOrDefault(s => s.Id == nextId);
                            if (nextStep != null)
                            {
                                nextSteps.Add(nextStep);
                            }
                        }
                    }
                    else if (step.RetryPolicy != null)
                    {
                        var retryResult = await RetryStepAsync(
                            step, context, step.RetryPolicy, cancellationToken);
                        
                        if (retryResult.Success)
                        {
                            executedSteps.Add(step.Id);
                        }
                    }
                    else
                    {
                        context.Status = WorkflowStatus.Failed;
                        context.EndTime = DateTime.UtcNow;
                        return context;
                    }
                }

                currentSteps = nextSteps;
            }

            context.Status = WorkflowStatus.Completed;
            context.EndTime = DateTime.UtcNow;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "工作流执行失败");
            context.Status = WorkflowStatus.Failed;
            context.EndTime = DateTime.UtcNow;
        }

        return context;
    }

    private async Task<StepExecutionLog> ExecuteStepAsync(
        WorkflowStep step,
        WorkflowContext context,
        CancellationToken cancellationToken)
    {
        var log = new StepExecutionLog
        {
            StepId = step.Id,
            StepName = step.Name,
            StartTime = DateTime.UtcNow
        };

        try
        {
            if (!_activities.TryGetValue(step.Type, out var activity))
            {
                throw new InvalidOperationException($"未找到活动类型: {step.Type}");
            }

            var result = await activity.ExecuteAsync(step, context, cancellationToken);
            
            log.Success = result.Success;
            log.Outputs = result.Outputs;
            log.EndTime = DateTime.UtcNow;

            foreach (var output in result.Outputs)
            {
                context.Variables[$"{step.Name}.{output.Key}"] = output.Value;
            }
        }
        catch (Exception ex)
        {
            log.Success = false;
            log.Error = ex.Message;
            log.EndTime = DateTime.UtcNow;
        }

        context.ExecutionLog.Add(log);
        return log;
    }

    private List<string> EvaluateConditions(WorkflowStep step, WorkflowContext context)
    {
        if (!step.Conditions.Any())
        {
            return step.NextSteps;
        }

        var results = new List<string>();

        foreach (var condition in step.Conditions)
        {
            if (EvaluateExpression(condition.Expression, context))
            {
                results.Add(condition.TargetStep);
            }
        }

        return results.Any() ? results : step.NextSteps;
    }

    private bool EvaluateExpression(string expression, WorkflowContext context)
    {
        return expression switch
        {
            var e when e.StartsWith("variables.") => EvaluateVariableCondition(e, context),
            var e when e.StartsWith("outputs.") => EvaluateOutputCondition(e, context),
            _ => true
        };
    }

    private List<WorkflowStep> GetStartSteps(Workflow workflow)
    {
        var allNextStepIds = workflow.Steps
            .SelectMany(s => s.NextSteps)
            .ToHashSet();

        return workflow.Steps
            .Where(s => !allNextStepIds.Contains(s.Id))
            .ToList();
    }

    private async Task<StepExecutionLog> RetryStepAsync(
        WorkflowStep step,
        WorkflowContext context,
        RetryPolicy policy,
        CancellationToken cancellationToken)
    {
        for (int i = 0; i < policy.MaxRetries; i++)
        {
            await Task.Delay(policy.DelayMs * (i + 1), cancellationToken);
            
            var result = await ExecuteStepAsync(step, context, cancellationToken);
            
            if (result.Success)
            {
                return result;
            }
        }

        return new StepExecutionLog
        {
            StepId = step.Id,
            Success = false,
            Error = "重试次数已用尽"
        };
    }
}

4. 工作流定义示例 #

json
{
    "name": "客户反馈处理流程",
    "description": "自动处理客户反馈并发送响应",
    "steps": [
        {
            "id": "step-1",
            "name": "分析反馈",
            "type": "LLM",
            "inputs": {
                "prompt": "分析以下客户反馈的情感和主题:{{feedback}}"
            },
            "nextSteps": ["step-2"]
        },
        {
            "id": "step-2",
            "name": "判断优先级",
            "type": "LLM",
            "inputs": {
                "prompt": "根据分析结果判断优先级(高/中/低):{{分析反馈.result}}"
            },
            "conditions": [
                {
                    "expression": "outputs.判断优先级.result == '高'",
                    "targetStep": "step-3"
                }
            ],
            "nextSteps": ["step-4"]
        },
        {
            "id": "step-3",
            "name": "通知管理员",
            "type": "Email",
            "inputs": {
                "to": "admin@company.com",
                "subject": "高优先级客户反馈",
                "body": "收到高优先级反馈:{{feedback}}"
            },
            "nextSteps": ["step-4"]
        },
        {
            "id": "step-4",
            "name": "生成回复",
            "type": "LLM",
            "inputs": {
                "prompt": "为以下客户反馈生成友好的回复:{{feedback}}"
            },
            "nextSteps": ["step-5"]
        },
        {
            "id": "step-5",
            "name": "发送回复",
            "type": "Email",
            "inputs": {
                "to": "{{customerEmail}}",
                "subject": "感谢您的反馈",
                "body": "{{生成回复.result}}"
            }
        }
    ]
}

5. API 控制器 #

csharp
[ApiController]
[Route("api/[controller]")]
public class WorkflowController : ControllerBase
{
    private readonly IWorkflowEngine _engine;
    private readonly IWorkflowRepository _repository;

    [HttpPost]
    public async Task<ActionResult<Workflow>> Create([FromBody] Workflow workflow)
    {
        await _repository.SaveAsync(workflow);
        return Ok(workflow);
    }

    [HttpPost("{id}/execute")]
    public async Task<ActionResult<WorkflowContext>> Execute(
        string id,
        [FromBody] Dictionary<string, object> inputs)
    {
        var workflow = await _repository.GetAsync(id);
        
        if (workflow == null)
        {
            return NotFound();
        }

        var context = await _engine.ExecuteAsync(workflow, inputs);
        return Ok(context);
    }
}

最佳实践 #

1. 模块化活动 #

csharp
// 每个活动只做一件事
public class SendEmailActivity : IActivity { }
public class SaveToDatabaseActivity : IActivity { }

2. 错误处理 #

csharp
public class WorkflowStep
{
    public RetryPolicy? RetryPolicy { get; set; }
    public string? FallbackStep { get; set; }
}

3. 监控 #

csharp
public class WorkflowMonitor
{
    public void TrackExecution(WorkflowContext context)
    {
        _metrics.RecordDuration("workflow", context.WorkflowId, 
            context.EndTime!.Value - context.StartTime);
    }
}

总结 #

恭喜你完成了 Semantic Kernel 文档的学习!现在你已经掌握了从基础到高级的所有知识,可以开始构建自己的 AI 应用了。

最后更新:2026-04-04