异步通信实战 #

一、异步通信概述 #

1.1 同步 vs 异步 #

text
同步调用:
服务A ──► 服务B ──► 服务C
  │         │         │
  └─────────┴─────────┘
      等待响应

异步调用:
服务A ──► 消息队列 ──► 服务B
  │
  └── 立即返回

1.2 异步通信场景 #

场景 说明
订单处理 下单后异步处理库存、通知等
日志收集 异步收集日志
事件通知 异步发送通知

二、消息可靠性 #

2.1 生产者确认 #

yaml
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
java
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("Message sent successfully");
        } else {
            System.out.println("Message send failed: " + cause);
        }
    }
}

2.2 消费者确认 #

yaml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
java
@RabbitListener(queues = "user-queue")
public void handleMessage(Message message, Channel channel) throws IOException {
    try {
        System.out.println("Received: " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

三、消息幂等 #

3.1 幂等设计 #

java
@Service
public class OrderService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void processOrder(Order order) {
        String key = "order:processed:" + order.getId();
        
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", Duration.ofHours(24));
        
        if (!success) {
            System.out.println("Order already processed: " + order.getId());
            return;
        }
        
        processOrderInternal(order);
    }
}

四、消息重试 #

4.1 配置重试 #

yaml
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3
          multiplier: 2

4.2 死信队列 #

java
@Bean
public Queue userQueue() {
    return QueueBuilder.durable("user-queue")
        .withArgument("x-dead-letter-exchange", "dlx.exchange")
        .withArgument("x-dead-letter-routing-key", "dlx.user")
        .build();
}

@Bean
public Queue deadLetterQueue() {
    return new Queue("dlx.user.queue");
}

五、总结 #

要点 说明
消息可靠性 生产者确认、消费者确认
消息幂等 唯一ID去重
消息重试 重试机制、死信队列

接下来让我们学习 链路追踪

最后更新:2026-03-28