异步通信实战 #
一、异步通信概述 #
1.1 同步 vs 异步 #
text
同步调用:
服务A ──► 服务B ──► 服务C
│ │ │
└─────────┴─────────┘
等待响应
异步调用:
服务A ──► 消息队列 ──► 服务B
│
└── 立即返回
1.2 异步通信场景 #
| 场景 | 说明 |
|---|---|
| 订单处理 | 下单后异步处理库存、通知等 |
| 日志收集 | 异步收集日志 |
| 事件通知 | 异步发送通知 |
二、消息可靠性 #
2.1 生产者确认 #
yaml
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
java
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("Message sent successfully");
} else {
System.out.println("Message send failed: " + cause);
}
}
}
2.2 消费者确认 #
yaml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
java
@RabbitListener(queues = "user-queue")
public void handleMessage(Message message, Channel channel) throws IOException {
try {
System.out.println("Received: " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
三、消息幂等 #
3.1 幂等设计 #
java
@Service
public class OrderService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void processOrder(Order order) {
String key = "order:processed:" + order.getId();
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofHours(24));
if (!success) {
System.out.println("Order already processed: " + order.getId());
return;
}
processOrderInternal(order);
}
}
四、消息重试 #
4.1 配置重试 #
yaml
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
multiplier: 2
4.2 死信队列 #
java
@Bean
public Queue userQueue() {
return QueueBuilder.durable("user-queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "dlx.user")
.build();
}
@Bean
public Queue deadLetterQueue() {
return new Queue("dlx.user.queue");
}
五、总结 #
| 要点 | 说明 |
|---|---|
| 消息可靠性 | 生产者确认、消费者确认 |
| 消息幂等 | 唯一ID去重 |
| 消息重试 | 重试机制、死信队列 |
接下来让我们学习 链路追踪!
最后更新:2026-03-28