Spring Cloud Stream #

一、Stream概述 #

1.1 什么是Spring Cloud Stream #

Spring Cloud Stream是一个构建消息驱动微服务的框架,通过Binder抽象实现了消息中间件的解耦。

text
┌─────────────────────────────────────────────────────────────┐
│                    Stream架构                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────┐                    ┌─────────────┐        │
│   │   Source    │                    │    Sink     │        │
│   │  消息生产者  │                    │  消息消费者  │        │
│   └──────┬──────┘                    └──────┬──────┘        │
│          │                                  │                │
│          ▼                                  ▼                │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                    Binder                            │  │
│   │              (消息中间件抽象层)                        │  │
│   │                                                      │  │
│   │   ┌─────────┐  ┌─────────┐  ┌─────────┐            │  │
│   │   │ RabbitMQ│  │  Kafka  │  │ RocketMQ│            │  │
│   │   └─────────┘  └─────────┘  └─────────┘            │  │
│   │                                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.2 核心概念 #

概念 说明
Binder 消息中间件抽象
Binding 消息通道与中间件的绑定
Source 消息生产者
Sink 消息消费者
Message 消息载体

二、基本使用 #

2.1 添加依赖 #

xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2.2 配置文件 #

yaml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: user-topic
          content-type: application/json
        input:
          destination: user-topic
          content-type: application/json
          group: user-group
      rabbit:
        bindings:
          input:
            consumer:
              durableSubscription: true

2.3 消息生产者 #

java
@Service
public class MessageProducer {

    @Autowired
    private Source source;

    public void sendMessage(User user) {
        source.output().send(MessageBuilder.withPayload(user).build());
    }
}

2.4 消息消费者 #

java
@Service
public class MessageConsumer {

    @StreamListener(Sink.INPUT)
    public void handleMessage(User user) {
        System.out.println("Received: " + user);
    }
}

三、自定义通道 #

3.1 定义通道接口 #

java
public interface CustomChannels {

    String OUTPUT = "customOutput";
    String INPUT = "customInput";

    @Output(OUTPUT)
    MessageChannel customOutput();

    @Input(INPUT)
    SubscribableChannel customInput();
}

3.2 启用通道 #

java
@SpringBootApplication
@EnableBinding(CustomChannels.class)
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

3.3 配置绑定 #

yaml
spring:
  cloud:
    stream:
      bindings:
        customOutput:
          destination: custom-topic
        customInput:
          destination: custom-topic
          group: custom-group

四、消息分组 #

4.1 消费者组 #

yaml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: user-topic
          group: user-group

4.2 分区支持 #

yaml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: user-topic
          producer:
            partitionKeyExpression: payload.id
            partitionCount: 3
        input:
          destination: user-topic
          group: user-group
          consumer:
            partitioned: true
            instanceCount: 3
            instanceIndex: 0

五、消息处理 #

5.1 条件消费 #

java
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='user'")
public void handleUserMessage(User user) {
    System.out.println("User message: " + user);
}

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='order'")
public void handleOrderMessage(Order order) {
    System.out.println("Order message: " + order);
}

5.2 消息转换 #

java
@StreamListener(Sink.INPUT)
public void handleMessage(Message<User> message) {
    MessageHeaders headers = message.getHeaders();
    User user = message.getPayload();
    System.out.println("Headers: " + headers);
    System.out.println("Payload: " + user);
}

5.3 错误处理 #

java
@Service
public class ErrorHandler {

    @ServiceActivator(inputChannel = "user-topic.user-group.errors")
    public void handleError(ErrorMessage errorMessage) {
        System.out.println("Error: " + errorMessage.getPayload());
    }
}

六、总结 #

要点 说明
Binder 消息中间件抽象
Binding 消息通道绑定
消息分组 消费者组
错误处理 错误通道处理

接下来让我们学习 消息总线

最后更新:2026-03-28