贫瘠之地

华北无浪漫,死海扬起帆
多少个夜晚,独自望着天

0%

RocketMQ 最佳实践

Topic 与 Tag

在 RocketMQ 中,Topic 和 Tag 都是业务上用来归类的标识,通过合理的使用 Topic 和 Tag 可以让业务结构清晰,更可以提高效率

Topic 是消息主题,通过 Topic 对不同的业务消息进行分类 Tag 是消息标签,用来进一步区分某个 Topic 下的消息分类,是消息生产时即由消息生产者设置的属性

Topic 和 Tag 的选择,建议从以下几个方面判断:

  • 消息类型是否一致:普通消息、事务消息、延迟消息、顺序消息;不同消息使用不同的 Topic,无法通过 Tag 进行区分
  • 业务是否相关联:没有关联的业务应该使用不同的 Topic
  • 消息优先级是否一致:同一个 Topic 内应该是同样优先级的消息
  • 量级是否相当:业务量小但实时性高的消息和业务量大的消息放在一个 Topic 内,可能会导致饥饿

以电商系统为例,订单消息和支付消息属于不同的业务,设置 TOPIC_ORDER 和 TOPIC_PAY 其中订单消息根据商品种类再划分为不同的 tag,例如电器类、服装类、图书类等 支付消息根据不同的支付渠道划分,例如银行卡、支付宝、微信支付等

消费幂等

为了防止消息重复消费导致业务处理异常,RocketMQ 消费者在接收消息后,有必要根据业务上的唯一 Key 对消息进行幂等处理

个人认为需要注意的是,这个业务上的唯一 Key,无论是在 Consumer 层做去重实现业务幂等(去重表),还是业务逻辑上做幂等(状态机、版本校验),都应该是使用业务意义上的唯一标识,而不是依赖 Message ID

消息重复的场景

  • Producer 端:发送消息时,Brocker 已经收到消息并持久化,但 ACK 由于网络原因未成功返回给生产者,为了保证消息不丢失,生产者会再一次发送消息;或者上游业务认为失败重新进行了调用,就可能会发送重复的消息,并且是不同的 Message ID
  • Brocker 端:投递消息给消费者,消费者的 offset 因为网络等原因提交失败,为了保证消息至少一次,Brocker 会再次投递消息
  • Consumer 端:服务扩缩容导致的 rebalance 操作,本质上也是消费者的 offset 没有及时提交

因为不同的 Message ID 对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据

最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置

1
2
3
4
5
6
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 Key 做幂等处理
}
});

订阅关系

订阅关系需要保持一致,一致的定义是:同一个消费者组下所有 Consumer 实例所订阅的 Topic、Tag、Tag 的顺序必须完全一致

如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失

为什么会丢失?

当一个 Consumer 消费一个 Queue 时,会在 ConsumerQueue 中保存该消费者和消息的相关信息

ConsumerQueue 中保存了如下信息:

  • 前 8 个字节记录消息在 CommitLog 中的偏移量
  • 中间 4 个字节记录消息大小
  • 最后 8 个字节记录消息中 tag 的 hashcode

假如一个 Consumer 订阅了 Topic1 中的 Tag1,那这个 Consumer 拉取消息时,首先从 Name Server 获取订阅关系,得到当前 Consumer 订阅的所有 tag 的 hashcode 集合 codeSet;每次从 ConsumerQueue 获取一条记录,就要判断最后 8 个字节 tag hashcode 是否在 codeSet 中,比如 Tag2 不在 codeSet 中,就会被过滤掉

所以不正确的订阅关系会导致该 ConsumerQueue 中的消息被错误丢弃了

消息堆积

当客户端消费速度跟不上服务端消息发送的速度,便会出现消息堆积

消息堆积的主要影响:

  • 消费延迟
  • 生产者无法成功发送消息(无法生产)

以下场景需要着重关注消息堆积和延迟问题:

  • 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复
  • 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消息延迟也无法接受

客户端 Push 模式到消费消息有两个阶段:

  • 获取消息:通过长轮询批量拉取的方式从 Brocker 拉取消息,这一阶段吞吐量较高,所以一般不会成为性能瓶颈
  • 消费 & 提交:依赖业务的处理耗时和消费逻辑的并发度

所以通过消息消费的两大步骤来看,消息堆积的主要问题出现在第二阶段,即消费耗时消费并发度

消费耗时

影响消费耗时的消费逻辑主要分为 CPU 内存计算和外部 IO 操作;由于大部分服务器场景都是 IO 密集型,业务代码在正常情况下不会涉及复杂的运算,内部耗时相比 IO 耗时来说可以忽略不记

IO 操作通常包含以下逻辑:

  • 读写外部数据库,例如 MySQL
  • 读写外部缓存系统,例如 Redis
  • 下游系统调用,例如 Dubbo 调用或者下游 HTTP 接口调用

消费并发度

不同的消息类型对应不同的并发度配置策略

消息类型 并发度
普通消息 单节点线程数 × 节点数量
定时和延时消息 单节点线程数 × 节点数量
事务消息 单节点线程数 × 节点数量
顺序消息 Min(单节点线程数 × 节点数量,分区数)

此外单节点的并发度需要谨慎设置,不能盲目直接调大线程数,设置过大的线程数反而会带来大量的线程切换的开销

理想环境下单节点的最优线程数计算模型如下:

  • 单机 CPU 核数为 C
  • 线程切换耗时忽略不计,IO 操作不消耗 CPU
  • 线程有足够消息等待处理,且内存充足
  • 逻辑中 CPU 计算耗时为 T1,外部 IO 操作为 T2

则单个线程能达到的 TPS 为 1 / (T1 + T2),如果CPU使用率达到理想状态 100%,那么单机达到最大能力时需要设置 C × (T1 + T2) / T1 个线程

读写队列

新建 Topic 需要配置相关的属性 writeQueueNumsreadQueueNums,分别代表写队列的数量和读队列的数量

其中 writeQueueNumsreadQueueNums 的参数可以自由配置,为什么可以自由配置呢而不是必须相等呢?

备注:读写队列和读写分离不是同一个概念

读写分离指的是主节点和从节点关于读写请求分配的问题

读写队列在做路由信息时使用,在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息,在物理文件层面,只有写队列才会创建文件

读写队列数量不匹配时会发生什么

  • 写多读少:多出来的写队列无法被消费
  • 读多写少:Consumer 对应的多出来的读队列没有消息,也就不会从该读队列消费任何消息

为什么要这么设计

设置读写队列数的目的在于方便队列的缩容和扩容

一个 Topic 在每个 Brocker 上创建了 128 个队列,现在需要将队列缩容到 64 个,怎么做才能 100% 不会丢失消息,并且无需重启应用程序

解决办法:

  1. 先将写队列缩容(128 调整为 64);后续数据请求会进入 0 至 63 的写队列中,由之前的 Consumer 进行消费
  2. 等待 64 至 127 队列中的消息被消费完成
  3. 缩容读队列(128 调整为 64);此时 Consumer 重新分配,对应 64 个写队列

同时缩容写队列和读队列可能会导致部分消息未被消费

参考

最佳实践 (aliyun.com)

RocketMQ订阅关系不一致 - JustJavaIt - 博客园 (cnblogs.com)

rocketmq中的读写队列_八荒六合唯我独尊-CSDN博客_rocketmq读写队列