跳到主要内容

RocketMQ 整体架构

1. RocketMQ 整体架构

场景化例子: 假设你在运营一个电商平台,日订单量100万:

  • NameServer: 相当于"黄页",告诉大家订单消息存在哪个Broker上
  • Broker-Master: 负责接收和存储订单消息
  • Broker-Slave: Master的备份,Master挂了还能继续服务
  • Producer: 订单服务,创建订单时发送消息
  • Consumer: 库存服务、支付服务,处理订单消息

2. Topic 和 MessageQueue 的关系

实际分区策略示例

// 1. 轮询策略 - 消息均匀分布
producer.send(new Message("order_topic", "订单消息1")); // -> Queue-0
producer.send(new Message("order_topic", "订单消息2")); // -> Queue-1
producer.send(new Message("order_topic", "订单消息3")); // -> Queue-2

// 2. Hash策略 - 保证同一用户订单有序
producer.send(new Message("order_topic", "user123", "用户123的订单1"));
producer.send(new Message("order_topic", "user123", "用户123的订单2"));
// 两条消息都会发到同一个Queue,保证有序消费

// 3. 指定队列策略 - VIP用户专用队列
producer.send(new Message("order_topic", "VIP订单"), new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0); // VIP订单都发到Queue-0
}
}, null);

3. Producer、Consumer、Broker 交互流程

真实场景时序

# 双11零点,订单创建高峰期
00:00:01 - Producer发送订单创建消息
00:00:01 - Broker收到消息,写入CommitLog
00:00:02 - 库存Consumer拉取到消息,扣减库存
00:00:03 - 支付Consumer拉取到消息,创建支付单
00:00:05 - 物流Consumer拉取到消息,预分配运力

4. 消息发送方式对比

使用场景选择

// 1. 同步发送 - 重要业务消息
// 场景:订单支付成功通知
SendResult result = producer.send(paymentMsg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 确认发送成功,更新订单状态
updateOrderStatus(orderId, "PAID");
}

// 2. 异步发送 - 平衡性能和可靠性
// 场景:用户行为日志
producer.send(behaviorMsg, new SendCallback() {
public void onSuccess(SendResult result) {
logger.info("行为日志发送成功: {}", result.getMsgId());
}
public void onException(Throwable e) {
logger.error("行为日志发送失败", e);
// 可以重试或存储到本地
}
});

// 3. 单向发送 - 高性能场景
// 场景:系统性能监控数据
producer.sendOneway(monitorMsg); // 不关心是否成功

5. 消费模式详解

Push模式代码示例

// 看起来是Push,实际是Pull实现
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order_topic", "*");

// 注册消息监听器 - 这就是"Push"的体现
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {

for (MessageExt msg : messages) {
// 处理订单消息
processOrder(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start(); // 内部启动Pull线程,实现"Push"效果

6. 高可用架构设计

容灾方案配置

// Producer 故障延迟配置
producer.setSendLatencyFaultEnable(true); // 开启故障延迟
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步发送重试次数

// Consumer 主从读取配置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式,支持故障转移

// Broker 主从配置
# Master配置 (broker-a.properties)
brokerRole=SYNC_MASTER # 同步主从
flushDiskType=SYNC_FLUSH # 同步刷盘,保证数据不丢失

# Slave配置 (broker-a-s.properties)
brokerRole=SLAVE
brokerId=1

7. 实际业务场景设计

场景1:电商订单系统

具体实现

// 订单服务发送消息
@Service
public class OrderService {

public void createOrder(Order order) {
// 1. 保存订单到数据库
orderRepository.save(order);

// 2. 发送订单创建消息
Message msg = new Message(
"order_created_topic", // Topic
order.getUserId(), // 分区键,保证同用户订单有序
JSON.toJSONString(order).getBytes() // 消息体
);

// 使用事务消息保证一致性
TransactionSendResult result = transactionProducer.sendMessageInTransaction(
msg, order);

if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
log.info("订单消息发送成功: {}", order.getOrderId());
}
}
}

// 库存服务消费消息
@Component
public class InventoryConsumer {

@RocketMQMessageListener(
topic = "order_created_topic",
consumerGroup = "inventory_consumer_group"
)
public class InventoryMessageListener implements RocketMQListener<Order> {

public void onMessage(Order order) {
try {
// 扣减库存
inventoryService.deductStock(order.getProductId(), order.getQuantity());
log.info("库存扣减成功: {}", order.getOrderId());

} catch (InsufficientStockException e) {
// 库存不足,发送库存不足消息
sendStockInsufficientMessage(order);
throw e; // 抛异常,消息会重试
}
}
}
}

场景2:用户信息变更广播

事件驱动实现

// 用户变更事件定义
@Data
public class UserChangeEvent {
private String userId;
private String eventType; // PROFILE_UPDATE, ADDRESS_UPDATE
private Map<String, Object> oldValues;
private Map<String, Object> newValues;
private long timestamp;
}

// 用户服务发送变更事件
@Service
public class UserService {

@Transactional
public void updateUserProfile(String userId, UserProfile profile) {
// 1. 查询原始数据
UserProfile oldProfile = userRepository.findById(userId);

// 2. 更新数据库
userRepository.updateProfile(userId, profile);

// 3. 发送变更事件
UserChangeEvent event = UserChangeEvent.builder()
.userId(userId)
.eventType("PROFILE_UPDATE")
.oldValues(convertToMap(oldProfile))
.newValues(convertToMap(profile))
.timestamp(System.currentTimeMillis())
.build();

messageProducer.send("user_change_topic", event);
}
}

// 积分系统消费用户变更
@RocketMQMessageListener(
topic = "user_change_topic",
consumerGroup = "points_consumer_group",
selectorExpression = "PROFILE_UPDATE || ADDRESS_UPDATE" // 只关心特定事件
)
public class PointsConsumer implements RocketMQListener<UserChangeEvent> {

public void onMessage(UserChangeEvent event) {
if ("PROFILE_UPDATE".equals(event.getEventType())) {
// 完善资料奖励积分
pointsService.awardProfileCompletionPoints(event.getUserId());
}
}
}

场景3:订单超时取消(延迟消息)

延迟消息实现

// 订单创建时发送延迟消息
@Service
public class OrderService {

public void createOrder(Order order) {
// 1. 保存订单
order.setStatus("PENDING_PAYMENT");
orderRepository.save(order);

// 2. 发送延迟取消消息
Message delayMsg = new Message(
"order_timeout_topic",
order.getOrderId(),
JSON.toJSONString(order).getBytes()
);

// 设置延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
delayMsg.setDelayTimeLevel(16); // 30分钟

producer.send(delayMsg);
log.info("订单超时检查消息已发送: {}", order.getOrderId());
}
}

// 订单超时处理
@RocketMQMessageListener(
topic = "order_timeout_topic",
consumerGroup = "order_timeout_consumer"
)
public class OrderTimeoutConsumer implements RocketMQListener<Order> {

public void onMessage(Order delayedOrder) {
String orderId = delayedOrder.getOrderId();

// 查询订单当前状态
Order currentOrder = orderRepository.findById(orderId);

if (currentOrder == null) {
log.warn("订单不存在: {}", orderId);
return;
}

// 只有待支付状态才需要取消
if ("PENDING_PAYMENT".equals(currentOrder.getStatus())) {

// 加分布式锁防止并发
if (redisLock.tryLock("order_cancel_" + orderId, 30)) {
try {
// 取消订单
currentOrder.setStatus("CANCELLED");
orderRepository.save(currentOrder);

// 释放库存
inventoryService.releaseStock(currentOrder.getProductId(),
currentOrder.getQuantity());

// 发送取消通知
notificationService.sendOrderCancelledNotification(currentOrder);

log.info("订单超时自动取消: {}", orderId);

} finally {
redisLock.unlock("order_cancel_" + orderId);
}
}
} else {
log.info("订单已支付,忽略超时消息: {}", orderId);
}
}
}

8. 性能优化和故障处理

消息积压处理方案

积压处理实战

// 积压监控
@Component
public class MessageBacklogMonitor {

@Scheduled(fixedRate = 30000) // 30秒检查一次
public void checkBacklog() {
try {
// 获取消费进度
ConsumeStats consumeStats = mqAdminTool.examineConsumeStats("order_consumer_group");

for (MessageQueue mq : consumeStats.getOffsetTable().keySet()) {
OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();

if (diff > 10000) { // 积压超过1万条
// 发送告警
alertService.sendBacklogAlert(mq.getTopic(), diff);

// 自动扩容
autoScaleConsumer(mq.getTopic());
}
}
} catch (Exception e) {
log.error("检查积压失败", e);
}
}

private void autoScaleConsumer(String topic) {
// 动态启动新的Consumer实例
for (int i = 0; i < 3; i++) {
ConsumerInstance consumer = new ConsumerInstance(topic);
consumer.start();
emergencyConsumers.add(consumer);
}
}
}

// 紧急处理Consumer
public class EmergencyConsumer {

@RocketMQMessageListener(
topic = "${emergency.topic}",
consumerGroup = "emergency_consumer_group",
consumeThreadMax = 64 // 增加消费线程数
)
public class EmergencyMessageListener implements RocketMQListener<String> {

public void onMessage(String message) {
try {
// 简化业务逻辑,快速消费
processMessageSimple(message);

} catch (Exception e) {
// 记录失败日志,但不重试,避免影响消费速度
log.error("紧急消费失败: {}", message, e);
}
}

private void processMessageSimple(String message) {
// 只做核心业务逻辑,非核心逻辑异步处理
if (isImportantMessage(message)) {
processImportantLogic(message);
}
// 其他逻辑稍后处理
}
}
}

死信队列处理

死信处理代码

// 死信消息处理器
@Service
public class DeadLetterQueueProcessor {

// 监控死信队列
@RocketMQMessageListener(
topic = "%DLQ%order_consumer_group", // 死信队列topic格式
consumerGroup = "dlq_processor_group"
)
public class DLQMessageListener implements RocketMQListener<MessageExt> {

public void onMessage(MessageExt message) {
// 分析死信原因
DeadLetterAnalysis analysis = analyzeDeadLetter(message);

// 保存到数据库用于后续处理
deadLetterRepository.save(analysis);

// 根据错误类型决定处理策略
switch (analysis.getErrorType()) {
case "SYSTEM_ERROR":
// 系统错误,等待修复后重试
scheduleRetry(message, "等待系统修复");
break;

case "DATA_FORMAT_ERROR":
// 数据格式错误,需要人工修复
notifyDeveloper(analysis);
break;

case "BUSINESS_LOGIC_ERROR":
// 业务逻辑错误,可能需要特殊处理
handleBusinessError(message, analysis);
break;
}
}
}

// 重新处理死信消息
public void reprocessDeadLetter(Long dlqId) {
DeadLetterAnalysis dlq = deadLetterRepository.findById(dlqId);

if (dlq.isFixed()) {
// 重新发送到原始topic
Message newMsg = new Message(
dlq.getOriginalTopic(),
dlq.getMessageBody().getBytes()
);

producer.send(newMsg);
dlq.setStatus("REPROCESSED");
deadLetterRepository.save(dlq);
}
}
}

核心优化要点总结

1. 性能优化

  • Producer: 批量发送、异步发送、合理分区
  • Consumer: 增加消费线程、批量消费、优化业务逻辑
  • Broker: SSD存储、调优JVM参数、集群扩容

2. 可靠性保证

  • 消息不丢失: 同步刷盘、同步复制、事务消息
  • 消息不重复: 消费幂等性设计、业务去重
  • 消息有序: 单队列消费、消费者锁定

3. 高可用设计

  • 多Master部署: 避免单点故障
  • 主从复制: 数据备份和故障切换
  • 多机房部署: 容灾备份

记住:RocketMQ的核心是解耦、削峰、异步,合理的架构设计比单纯的性能调优更重要!