工作流自动化 #
项目概述 #
本节将构建一个智能工作流自动化系统,具备以下功能:
- 可视化流程定义
- 条件分支与循环
- 任务调度与执行
- 错误处理与重试
- 执行状态监控
工作流架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ 工作流架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 流程定义 │────▶│ 流程解析 │────▶│ 流程执行 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ┌───────────────────────────────────────┤ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 任务执行 │ │ 条件判断 │ │ 状态管理 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
项目结构 #
text
WorkflowEngine/
├── Models/
│ ├── Workflow.cs
│ ├── WorkflowStep.cs
│ ├── WorkflowContext.cs
│ └── ExecutionResult.cs
├── Services/
│ ├── IWorkflowEngine.cs
│ ├── WorkflowEngine.cs
│ ├── IStepExecutor.cs
│ └── StepExecutor.cs
├── Activities/
│ ├── IActivity.cs
│ ├── LLMActivity.cs
│ ├── HttpActivity.cs
│ ├── EmailActivity.cs
│ └── DelayActivity.cs
├── Controllers/
│ └── WorkflowController.cs
└── Program.cs
核心代码实现 #
1. 工作流模型 #
csharp
public class Workflow
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = "";
public string Description { get; set; } = "";
public List<WorkflowStep> Steps { get; set; } = new();
public WorkflowTrigger Trigger { get; set; } = new();
public WorkflowSettings Settings { get; set; } = new();
}
public class WorkflowStep
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = "";
public string Type { get; set; } = "";
public Dictionary<string, object> Inputs { get; set; } = new();
public List<WorkflowCondition> Conditions { get; set; } = new();
public List<string> NextSteps { get; set; } = new();
public RetryPolicy? RetryPolicy { get; set; }
}
public class WorkflowCondition
{
public string Expression { get; set; } = "";
public string TargetStep { get; set; } = "";
}
public class WorkflowContext
{
public string ExecutionId { get; set; } = Guid.NewGuid().ToString();
public string WorkflowId { get; set; } = "";
public Dictionary<string, object> Variables { get; set; } = new();
public Dictionary<string, object> Inputs { get; set; } = new();
public Dictionary<string, object> Outputs { get; set; } = new();
public List<StepExecutionLog> ExecutionLog { get; set; } = new();
public DateTime StartTime { get; set; }
public DateTime? EndTime { get; set; }
public WorkflowStatus Status { get; set; }
}
public class StepExecutionLog
{
public string StepId { get; set; } = "";
public string StepName { get; set; } = "";
public DateTime StartTime { get; set; }
public DateTime? EndTime { get; set; }
public bool Success { get; set; }
public string? Error { get; set; }
public Dictionary<string, object> Outputs { get; set; } = new();
}
2. 活动定义 #
csharp
public interface IActivity
{
string Type { get; }
Task<ActivityResult> ExecuteAsync(
WorkflowStep step,
WorkflowContext context,
CancellationToken cancellationToken = default);
}
public class LLMActivity : IActivity
{
private readonly Kernel _kernel;
public string Type => "LLM";
public LLMActivity(Kernel kernel)
{
_kernel = kernel;
}
public async Task<ActivityResult> ExecuteAsync(
WorkflowStep step,
WorkflowContext context,
CancellationToken cancellationToken = default)
{
var promptTemplate = step.Inputs["prompt"]?.ToString() ?? "";
var prompt = ResolveVariables(promptTemplate, context);
var result = await _kernel.InvokePromptAsync(
prompt,
cancellationToken: cancellationToken
);
return new ActivityResult
{
Success = true,
Outputs = new Dictionary<string, object>
{
["result"] = result.ToString()
}
};
}
private string ResolveVariables(string template, WorkflowContext context)
{
foreach (var variable in context.Variables)
{
template = template.Replace($"{{{{{variable.Key}}}}}", variable.Value?.ToString());
}
foreach (var input in context.Inputs)
{
template = template.Replace($"{{{{{input.Key}}}}}", input.Value?.ToString());
}
return template;
}
}
public class HttpActivity : IActivity
{
private readonly HttpClient _httpClient;
public string Type => "Http";
public async Task<ActivityResult> ExecuteAsync(
WorkflowStep step,
WorkflowContext context,
CancellationToken cancellationToken = default)
{
var url = step.Inputs["url"]?.ToString() ?? "";
var method = step.Inputs.GetValueOrDefault("method", "GET")?.ToString() ?? "GET";
HttpResponseMessage response;
if (method.Equals("GET", StringComparison.OrdinalIgnoreCase))
{
response = await _httpClient.GetAsync(url, cancellationToken);
}
else
{
var body = step.Inputs.GetValueOrDefault("body", "")?.ToString();
var content = new StringContent(body ?? "", Encoding.UTF8, "application/json");
response = await _httpClient.PostAsync(url, content, cancellationToken);
}
var result = await response.Content.ReadAsStringAsync(cancellationToken);
return new ActivityResult
{
Success = response.IsSuccessStatusCode,
Outputs = new Dictionary<string, object>
{
["statusCode"] = (int)response.StatusCode,
["body"] = result
}
};
}
}
public class EmailActivity : IActivity
{
private readonly IEmailService _emailService;
public string Type => "Email";
public EmailActivity(IEmailService emailService)
{
_emailService = emailService;
}
public async Task<ActivityResult> ExecuteAsync(
WorkflowStep step,
WorkflowContext context,
CancellationToken cancellationToken = default)
{
var to = step.Inputs["to"]?.ToString() ?? "";
var subject = step.Inputs["subject"]?.ToString() ?? "";
var body = step.Inputs["body"]?.ToString() ?? "";
await _emailService.SendAsync(to, subject, body);
return new ActivityResult
{
Success = true,
Outputs = new Dictionary<string, object>
{
["sent"] = true
}
};
}
}
public class DelayActivity : IActivity
{
public string Type => "Delay";
public async Task<ActivityResult> ExecuteAsync(
WorkflowStep step,
WorkflowContext context,
CancellationToken cancellationToken = default)
{
var delayMs = Convert.ToInt32(step.Inputs.GetValueOrDefault("milliseconds", 1000));
await Task.Delay(delayMs, cancellationToken);
return new ActivityResult
{
Success = true,
Outputs = new Dictionary<string, object>
{
["delayed"] = delayMs
}
};
}
}
3. 工作流引擎 #
csharp
public class WorkflowEngine : IWorkflowEngine
{
private readonly Kernel _kernel;
private readonly Dictionary<string, IActivity> _activities;
private readonly ILogger<WorkflowEngine> _logger;
public WorkflowEngine(
Kernel kernel,
IEnumerable<IActivity> activities,
ILogger<WorkflowEngine> logger)
{
_kernel = kernel;
_activities = activities.ToDictionary(a => a.Type);
_logger = logger;
}
public async Task<WorkflowContext> ExecuteAsync(
Workflow workflow,
Dictionary<string, object>? inputs = null,
CancellationToken cancellationToken = default)
{
var context = new WorkflowContext
{
WorkflowId = workflow.Id,
Inputs = inputs ?? new Dictionary<string, object>(),
StartTime = DateTime.UtcNow,
Status = WorkflowStatus.Running
};
try
{
var executedSteps = new HashSet<string>();
var currentSteps = GetStartSteps(workflow);
while (currentSteps.Any())
{
var nextSteps = new List<WorkflowStep>();
foreach (var step in currentSteps)
{
if (executedSteps.Contains(step.Id))
continue;
var result = await ExecuteStepAsync(step, context, cancellationToken);
if (result.Success)
{
executedSteps.Add(step.Id);
var nextStepIds = EvaluateConditions(step, context);
foreach (var nextId in nextStepIds)
{
var nextStep = workflow.Steps.FirstOrDefault(s => s.Id == nextId);
if (nextStep != null)
{
nextSteps.Add(nextStep);
}
}
}
else if (step.RetryPolicy != null)
{
var retryResult = await RetryStepAsync(
step, context, step.RetryPolicy, cancellationToken);
if (retryResult.Success)
{
executedSteps.Add(step.Id);
}
}
else
{
context.Status = WorkflowStatus.Failed;
context.EndTime = DateTime.UtcNow;
return context;
}
}
currentSteps = nextSteps;
}
context.Status = WorkflowStatus.Completed;
context.EndTime = DateTime.UtcNow;
}
catch (Exception ex)
{
_logger.LogError(ex, "工作流执行失败");
context.Status = WorkflowStatus.Failed;
context.EndTime = DateTime.UtcNow;
}
return context;
}
private async Task<StepExecutionLog> ExecuteStepAsync(
WorkflowStep step,
WorkflowContext context,
CancellationToken cancellationToken)
{
var log = new StepExecutionLog
{
StepId = step.Id,
StepName = step.Name,
StartTime = DateTime.UtcNow
};
try
{
if (!_activities.TryGetValue(step.Type, out var activity))
{
throw new InvalidOperationException($"未找到活动类型: {step.Type}");
}
var result = await activity.ExecuteAsync(step, context, cancellationToken);
log.Success = result.Success;
log.Outputs = result.Outputs;
log.EndTime = DateTime.UtcNow;
foreach (var output in result.Outputs)
{
context.Variables[$"{step.Name}.{output.Key}"] = output.Value;
}
}
catch (Exception ex)
{
log.Success = false;
log.Error = ex.Message;
log.EndTime = DateTime.UtcNow;
}
context.ExecutionLog.Add(log);
return log;
}
private List<string> EvaluateConditions(WorkflowStep step, WorkflowContext context)
{
if (!step.Conditions.Any())
{
return step.NextSteps;
}
var results = new List<string>();
foreach (var condition in step.Conditions)
{
if (EvaluateExpression(condition.Expression, context))
{
results.Add(condition.TargetStep);
}
}
return results.Any() ? results : step.NextSteps;
}
private bool EvaluateExpression(string expression, WorkflowContext context)
{
return expression switch
{
var e when e.StartsWith("variables.") => EvaluateVariableCondition(e, context),
var e when e.StartsWith("outputs.") => EvaluateOutputCondition(e, context),
_ => true
};
}
private List<WorkflowStep> GetStartSteps(Workflow workflow)
{
var allNextStepIds = workflow.Steps
.SelectMany(s => s.NextSteps)
.ToHashSet();
return workflow.Steps
.Where(s => !allNextStepIds.Contains(s.Id))
.ToList();
}
private async Task<StepExecutionLog> RetryStepAsync(
WorkflowStep step,
WorkflowContext context,
RetryPolicy policy,
CancellationToken cancellationToken)
{
for (int i = 0; i < policy.MaxRetries; i++)
{
await Task.Delay(policy.DelayMs * (i + 1), cancellationToken);
var result = await ExecuteStepAsync(step, context, cancellationToken);
if (result.Success)
{
return result;
}
}
return new StepExecutionLog
{
StepId = step.Id,
Success = false,
Error = "重试次数已用尽"
};
}
}
4. 工作流定义示例 #
json
{
"name": "客户反馈处理流程",
"description": "自动处理客户反馈并发送响应",
"steps": [
{
"id": "step-1",
"name": "分析反馈",
"type": "LLM",
"inputs": {
"prompt": "分析以下客户反馈的情感和主题:{{feedback}}"
},
"nextSteps": ["step-2"]
},
{
"id": "step-2",
"name": "判断优先级",
"type": "LLM",
"inputs": {
"prompt": "根据分析结果判断优先级(高/中/低):{{分析反馈.result}}"
},
"conditions": [
{
"expression": "outputs.判断优先级.result == '高'",
"targetStep": "step-3"
}
],
"nextSteps": ["step-4"]
},
{
"id": "step-3",
"name": "通知管理员",
"type": "Email",
"inputs": {
"to": "admin@company.com",
"subject": "高优先级客户反馈",
"body": "收到高优先级反馈:{{feedback}}"
},
"nextSteps": ["step-4"]
},
{
"id": "step-4",
"name": "生成回复",
"type": "LLM",
"inputs": {
"prompt": "为以下客户反馈生成友好的回复:{{feedback}}"
},
"nextSteps": ["step-5"]
},
{
"id": "step-5",
"name": "发送回复",
"type": "Email",
"inputs": {
"to": "{{customerEmail}}",
"subject": "感谢您的反馈",
"body": "{{生成回复.result}}"
}
}
]
}
5. API 控制器 #
csharp
[ApiController]
[Route("api/[controller]")]
public class WorkflowController : ControllerBase
{
private readonly IWorkflowEngine _engine;
private readonly IWorkflowRepository _repository;
[HttpPost]
public async Task<ActionResult<Workflow>> Create([FromBody] Workflow workflow)
{
await _repository.SaveAsync(workflow);
return Ok(workflow);
}
[HttpPost("{id}/execute")]
public async Task<ActionResult<WorkflowContext>> Execute(
string id,
[FromBody] Dictionary<string, object> inputs)
{
var workflow = await _repository.GetAsync(id);
if (workflow == null)
{
return NotFound();
}
var context = await _engine.ExecuteAsync(workflow, inputs);
return Ok(context);
}
}
最佳实践 #
1. 模块化活动 #
csharp
// 每个活动只做一件事
public class SendEmailActivity : IActivity { }
public class SaveToDatabaseActivity : IActivity { }
2. 错误处理 #
csharp
public class WorkflowStep
{
public RetryPolicy? RetryPolicy { get; set; }
public string? FallbackStep { get; set; }
}
3. 监控 #
csharp
public class WorkflowMonitor
{
public void TrackExecution(WorkflowContext context)
{
_metrics.RecordDuration("workflow", context.WorkflowId,
context.EndTime!.Value - context.StartTime);
}
}
总结 #
恭喜你完成了 Semantic Kernel 文档的学习!现在你已经掌握了从基础到高级的所有知识,可以开始构建自己的 AI 应用了。
最后更新:2026-04-04