Spring Cloud Stream #
一、Stream概述 #
1.1 什么是Spring Cloud Stream #
Spring Cloud Stream是一个构建消息驱动微服务的框架,通过Binder抽象实现了消息中间件的解耦。
text
┌─────────────────────────────────────────────────────────────┐
│ Stream架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Source │ │ Sink │ │
│ │ 消息生产者 │ │ 消息消费者 │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Binder │ │
│ │ (消息中间件抽象层) │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ RabbitMQ│ │ Kafka │ │ RocketMQ│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
1.2 核心概念 #
| 概念 | 说明 |
|---|---|
| Binder | 消息中间件抽象 |
| Binding | 消息通道与中间件的绑定 |
| Source | 消息生产者 |
| Sink | 消息消费者 |
| Message | 消息载体 |
二、基本使用 #
2.1 添加依赖 #
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.2 配置文件 #
yaml
spring:
cloud:
stream:
bindings:
output:
destination: user-topic
content-type: application/json
input:
destination: user-topic
content-type: application/json
group: user-group
rabbit:
bindings:
input:
consumer:
durableSubscription: true
2.3 消息生产者 #
java
@Service
public class MessageProducer {
@Autowired
private Source source;
public void sendMessage(User user) {
source.output().send(MessageBuilder.withPayload(user).build());
}
}
2.4 消息消费者 #
java
@Service
public class MessageConsumer {
@StreamListener(Sink.INPUT)
public void handleMessage(User user) {
System.out.println("Received: " + user);
}
}
三、自定义通道 #
3.1 定义通道接口 #
java
public interface CustomChannels {
String OUTPUT = "customOutput";
String INPUT = "customInput";
@Output(OUTPUT)
MessageChannel customOutput();
@Input(INPUT)
SubscribableChannel customInput();
}
3.2 启用通道 #
java
@SpringBootApplication
@EnableBinding(CustomChannels.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
3.3 配置绑定 #
yaml
spring:
cloud:
stream:
bindings:
customOutput:
destination: custom-topic
customInput:
destination: custom-topic
group: custom-group
四、消息分组 #
4.1 消费者组 #
yaml
spring:
cloud:
stream:
bindings:
input:
destination: user-topic
group: user-group
4.2 分区支持 #
yaml
spring:
cloud:
stream:
bindings:
output:
destination: user-topic
producer:
partitionKeyExpression: payload.id
partitionCount: 3
input:
destination: user-topic
group: user-group
consumer:
partitioned: true
instanceCount: 3
instanceIndex: 0
五、消息处理 #
5.1 条件消费 #
java
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='user'")
public void handleUserMessage(User user) {
System.out.println("User message: " + user);
}
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='order'")
public void handleOrderMessage(Order order) {
System.out.println("Order message: " + order);
}
5.2 消息转换 #
java
@StreamListener(Sink.INPUT)
public void handleMessage(Message<User> message) {
MessageHeaders headers = message.getHeaders();
User user = message.getPayload();
System.out.println("Headers: " + headers);
System.out.println("Payload: " + user);
}
5.3 错误处理 #
java
@Service
public class ErrorHandler {
@ServiceActivator(inputChannel = "user-topic.user-group.errors")
public void handleError(ErrorMessage errorMessage) {
System.out.println("Error: " + errorMessage.getPayload());
}
}
六、总结 #
| 要点 | 说明 |
|---|---|
| Binder | 消息中间件抽象 |
| Binding | 消息通道绑定 |
| 消息分组 | 消费者组 |
| 错误处理 | 错误通道处理 |
接下来让我们学习 消息总线!
最后更新:2026-03-28