MQ消息幂等(解决重复消费问题)
序言
工作中会经常遇到类似下面这种场景,
- 基础架构组要对
RocketMQ
进行迁移扩容,业务方需要Check
使用RocketMQ
是否有幂等性保障,迁移过程中可能会产生重复消息的问题。对业务要求就是要保证消息幂等性,避免重复消费。RocketMQ
迁移小组会在保障群里同步迁移状态,如果有问题及时联系反馈。- 业务方上报自己的业务场景发生了重复消费问题,查看
Broker
运行情况,确实在21
日9
点Broker
出现了cpu
满百的情况,这个期间业务的消息都会受到影响。该业务方消息场景属于批量发送的普通消息,在Broker
运行正常情况下,且消费者在线时,不会出现延迟一天的情况。21
日出现cpu
水位较高后,运维又重新发布了Broker
一次,该业务服务是基于该情况下重复消费了。之前也出现过Broker
发布后,出现重复消费的问题。消费者和Broker
之间需要有明确的应答,如果中间出现了异常会出现这种情况。RocketMQ
无法保证消息只消费一次,RocketMQ
属于at least once
至少一次。消息不会丢失,但可能被处理多次,可能重复,业务方需要做幂等设计。- 消息在网络传输过程中发生错误,发送方收不到确认响应,会用重发机制保证消息不丢失,但如果只是因为响应在网络传输时丢失,但实际消费成功了,这时也会重发消息,
Broker
和Consumer
都会重复收到消息,产生重复消费问题。
可以看到上面场景都是因为消息没有做好幂等导致的重复消费问题。 对于不熟悉这种机制的开发人员来说,心里不免有些疑问。
- 什么是消息幂等,它和常说的幂等设计有什么区别和联系,和重复消费又有什么联系;
- 重复消费问题是如何发生的,怎么处理去尽量避免,针对于具体的场景(比如重新发布了
Broker
)为啥会重复消费呢; - 我们应该对于这种问题怎么去做合理的防御性设计。
什么是消息幂等
在给业务服务设计技术方案时,经常会考虑接口幂等。接口幂等是指无论调用多少次,接口执行结果都一样。比如写入接口幂等就是不管写入多少次,这条消息只会被真正写入一次,不会被重复写入,查询、删除等接口幂等也是这样。
消息队列中的消息幂等,从整体消息角度上看,就是同一条消息无论被消费多少次,系统最终状态都一样,不会有任何副作用。这就要求消息处理逻辑具有幂等性(相同的输入产生相同的输出)。结合不同场景(比如生产/消费/集群管控),消息队列中的幂等可以分为生产幂等、消费幂等和其他一些集群管控操作幂等。
生产幂等
生产幂等,是指同一条消息不会被重复写入到Broker
。一条消息在客户端发送了多次,Broker
只保存一份。
生产幂等的核心是,
Broker
能识别接收到的多条消息是否为同一条消息。如果Broker
不知道收到的消息是否为同一条,就无法拒绝重复的消息。消息唯一性以生产者的send
调用次数为准,生产者send
几次就表示发送了几条消息(生产幂等逻辑主体是生产者),需要在客户端调用send
时标识消息的唯一性。
实现思路
实现生产幂等的思路,一般有以下两种,
- 发消息时给每条消息指定唯一消息
ID
(MsgID
)表示消息唯一性,判断这条消息没被接收就保存数据,否则就拒绝写入。需要在生产端开启按Key
Hash
机制,保证同一个MsgID
的消息发送到同一分区,如果同一条消息多次发送到不同分区,就没法判断之前是否接收过这条消息。- 使用生产者ID(全局唯一) + 自增序号,不必保存所有成功的
SeqNum
,但可以识别出所有已发送的SeqNum
,不能实现强幂等,得全量。
唯一消息ID
这种方案有以下两个技术问题,识别重复MsgID
实现幂等要做很多工作,代码量也不小。
- 生成消息分布式唯一
ID
,常见算法有UUID
、基于MySQL/Redis/ZooKeeper
等第三方组件生成、雪花算法、美团Leaf
算法等。UUID
因为其内容和长度可能重复,并不适用;- 基于第三方系统生成的方法成本较高,同样不适用;
- 雪花算法,或者美团Leaf算法可以考虑,业务只需在
Topic
维度保持消息顺序性,集群ID
和TopicID
唯一标识集群和Topic
,将雪花算法中的数据中心ID
和机器ID
替换为集群ID
和TopicID
,就可以生成唯一消息ID
。
Broker
识别没有接收过这个消息ID
,需要在Broker
端保存已接收的所有MsgID
,在接收到消息后将消息ID
和当前接收过的所有消息ID
比较,判断消息是否重复。作为一个大吞吐的存储组件,Broker
历史接收量会很大。消息ID
记录/匹配性能肯定会有瓶颈,需要占用大量硬盘空间。- 可以在
Topic
维度保存消息ID
,不需要将集群所有消息ID
都存在一起,这样可以提高消息ID
保存、加载和查询的性能。 MQ
消息有过期机制,消息ID
集合可以只保留当前还在生命周期内的消息ID
,这样可以减少消息ID
数量,提高性能,减少存储空间。但客户端可能重新发送过期消息(理论上可能性较低),还需要给消息ID
集合设计过期机制,增加了开发成本。- 引入布隆过滤器来判断消息
ID
是否在已接收过的消息ID
集合,用来提高消息ID
去重判断的性能。
- 可以在
生产者ID(全局唯一) + 自增序号
给每个生产者赋予唯一ID
,生产者ID
全局唯一。生产者启动时生成一个从0
开始的自增序号,表示这个生产者发送的消息,每条消息分别有一个自增序号,比如0、1、2……
,使用Producer ID
和seqnum
二元组唯一标识消息。生产者ID
可以用上面提到的雪花算法来生成,因为生产者很少,也可以直接用UUID
,UUID
重复率非常低。
如果服务端也保留了生产者所有发送成功的seqnum
集合来实现消息重复判断,生产者很多的情况下,生产者一直没有重启,服务端就要保留很多Producer
-seqnum
数据,开发复杂度不亚于上一个方案,只是把标识从MsgID
换成Producer + seqnum
,没有本质区别。不保留所有seqnum
,只保留最新收到的seqnum
,也可以识别出所有已发送的seqnum
。如果收到的消息seqnum
是下一条msg
,就正常保存数据;否则就放进队列等待下一条msg
收到,再判断是否保存该数据,甚至可以直接拒绝消息写入。
怎么理解这里的下一条
msg
?
Broker
收到的消息所携带的Producer ID
为p1
的生产者,最新seqnum
为4
(current seqnum
),下一条允许收到的seqnum
是5
。- 下一条是
5
,保存数据,current seqnum
更新为5
,等待seqnum
为6
的数据。- 发送过来的是
8
,可以有两种策略,
- 先把
8
缓存在Broker
内存中,等待收到6
和7
后,再把8
写入存储。这种方案的缺点是6
和7
可能永远收不到,Broker
就要一直保存8
,可能会发生内存溢出或占用额外存储空间。- 给客户端返回可重试错误,触发客户端重复发送机制。客户端重试写入时,如果
Broker
已经收到7
,在等待8
了,这时这条消息就可以顺序写入了。
Kafka支持部分幂等,不能完全幂等
业界主流MQ
只有Kafka
支持生产幂等,RocketMQ
、RabbitMQ
、Pulsar
都不支持。
Kafka
的生产者在启动时给每个生产者分配唯一ID
,这个唯一ID
是客户端从Broker
申请的,不是自己生成的。Broker
在ZooKeeper
创建一个节点生成自增ID
返回给客户端,保证生产者ID
在这个集群唯一,属于第三方系统生成分布式唯一ID
。这里唯一ID
是Broker
和ZooKeeper
交互生成,不是客户端直接向ZooKeeper
请求生成,避免了客户端引入过高的复杂度。
Kafka
支持Batch
语义,发消息时给每批次消息分配一个seqnum
,标识生产者发送消息的唯一性。稍微有点不同的是,Kafka
在每个Topic-Partition
维度都有一个独立的seqnum
,PID + Topic + Partition + SeqNum
四元组,标识一条消息。因为seqnum
是针对PID + Topic + Partition
维度的,所以只保证单个生产者会话中的单分区幂等。怎么判断消息重复?Kafka
只会缓存最近5
个seqnum
,如果前面过期的再来,是不重复判定。Kafka
主打高性能,幂等和性能balance trade-off
,这样的实现支持部分幂等,不能完全幂等。
为什么用
PID + Topic + Partition + SeqNum
四元组唯一标识一条消息?
这里是性能和幂等效果trade-off
的结果。要明确下,Kafka Broker
端缓存PID
对应Topic-Partition
的5
个最近的batch
信息。如果减少标识维度,每台节点只保留5
条数据,缓存一下子就被刷掉了。就算增加缓存大小,因为有些分区数据量大,有些分区数据量小,一些小分区的数据缓存也会被挤出去了,无法实现幂等,所以就需要在消息标识中增加维度来表示。
为什么缓存最近5条?
数据在内存中,要保证这个功能的缓存数据不会对内存造成压力,要控制内存使用总量。假设单台Broker
可支持的分区数为P
,单台生产者的数量为M
,存的消息数量为N
,此时消耗的内存总量为T = P * M * N
。
M
完全不可控,P
取决于用户运营策略,也不可控,内核可控的就是N
。N
如果太大,会对内存造成太多压力,所以N
不能太大,拍了个5
,5
是hard code
在代码里的,不能改动。
消费幂等
RocketMQ
和Kafka
都是基于消费位点的消费机制,只要客户端不提交消费位点,消费天生幂等(不管怎么消费,都是同一条消息)。如果提交了offset
,就会自动消费下一条数据,也符合预期。即使重复提交了同一个位点,消费位点保存的是同一个值,对消费不会有任何影响。消费端说的更多的是Exactly Once
(保证一条消息只会被消费一次)语义。在消息系统和流式计算系统中,这种消息投递语义是消息流转的理想状态,但业界并没有太多理想实现,真正意义的Exactly Once
依赖消息系统服务端、客户端和用户消费逻辑这三者状态的协调,比如当消费端消费完一条消息后宕机,消费端重启,消费位点并没同步到服务端,可能会出现重复消费。
三种消息投递语义
三种模型/标签,描述流处理引擎应为应用提供的数据处理语义。
分布式事件流处理是大数据领域的热门话题。著名的流处理引擎(Streaming Processing Engines, SPEs
)包括Apache Storm
、Apache Flink
、Heron
、Apache Kafka
(Kafka Streams
)以及Apache Spark
(Spark Streaming
)。流处理引擎中一个被广泛讨论的特征就是它们的处理语义,有很多引擎声称提供Exactly Once
处理语义。
一个执行流/事件处理应用的流处理引擎允许用户制定一个可靠性模式或处理语义,描述引擎为应用图中实体间的数据处理提供什么样的保证。网络/机器故障导致数据丢失不可避免,这样的保证对满足不同场景的需求很有必要。三种处理语义的服务质量从低到高排列如下,
At Most Once
,讲的是尽力而为(best effort
),数据或者事件被保证只会被应用中的所有算子最多处理一次。对于流处理应用完全处理它之前丢失的数据,没有额外重试。At Least Once
,数据或事件被保证会被应用图中的所有算子都至少处理一次。当事件在被应用完全处理之前丢失,会从source
开始重放(replayed
)或重传(retransmitted
)。事件被重传,一个事件就会被处理超过一次。Exactly Once
,发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试发送消息导致消息重复投递,该消息在消费端也只被消费一次。
Exactly Once
的两种典型机制
分布式快照/状态检查点(
checkpointing
)受到
Chandy-Lamport
分布式快照算法启发,流处理应用中每个算子的所有状态周期性checkpointed
。如果系统发生了故障,每个算子的所有状态都回滚到最近的全局一致检查点处。在回滚过程中,所有处理都暂停。Sources
也会根据最近的检查点重置到正确的offset
。整个流处理应用基本上倒回到最近的一致性状态,处理也可以从这个状态重新开始。在这个例子中,
T1
时,流处理应用在正常工作,同时状态也被checkpointed
。T2
时,算子处理输入数据失败了,这时S = 4
的状态已经持久化保存到存储中,S = 12
的状态仍位于算子内存中,出现了不一致。T3
时,processing graph
倒回到S = 4
的状态并重放流中的每个状态直到最新状态,处理每个数据,解决了这个不一致问题。- 最终虽然某些数据被处理了多次,但无论回滚多少次,结果状态依然相同。
At Least Once
+ 消息去重重放失败事件,尝试重复处理,在每个算子上,在事件进入到用户定义的逻辑之前删除重复事件。该机制要为每个算子维护一份事务日志(
transaction log
)来记录哪些事件已经处理过了,使用类似机制的引擎有Google MillWheel2
、Apache Kafka Streams
。分布式快照 vs
At Least Once
+ 消息去重分布式快照/状态检查点
- 优点:较小的性能和资源开销
- 缺点:对性能的影响较大;拓扑越大,对性能的潜在影响越大
At Least Once
+ 消息去重- 优点:故障对性能的影响是局部的;故障的影响不一定会随着拓扑的大小而增加
- 缺点:可能需要大量的存储和基础设施来支持;每个算子的每个事件的性能开销
两种机制都提供了相同的语义和保证,但两种机制的实现有差异,性能也存在差异。
- 分布式快照/状态检查点,性能开销小,引擎往流应用程序中的所有算子一起发送常规事件和特殊事件,状态检查点可以在后台异步执行。对于大型流应用程序,故障可能会频繁发生,引擎需要暂停应用程序并回滚所有算子的状态,这反过来会影响性能。流式应用程序越大,故障发生的可能性就越大越频繁,性能受到的影响也越大。这种机制是非侵入性的,运行时需要的额外资源影响很小。
At Least Once
+ 消息去重,可能需要更多资源(尤其是存储)。引擎要能跟踪每个算子实例已完全处理的每个元组来执行消息去重,以及为每个事件执行消息去重。这意味着要跟踪大量数据,尤其是流应用程序很大或有许多应用程序在运行的情况。执行消息去重的每个算子上的每个事件都会产生性能开销,但在这种机制下,性能不太可能受到应用程序大小的影响。
对于前者,如果任何算子发生故障,需要发生全局暂停和状态回滚。
对于后者,失败的影响更加局部性。当在算子中发生故障时,可能尚未完全处理的事件仅从上游source
重放/重传。性能影响与流应用程序中发生故障的位置是隔离的,对流应用程序中其他算子的性能几乎没有影响。
理论上两种机制存在差异,但两者都可以简化为至少一次处理+幂等性。对于这两种机制,发生故障时(至少一次)事件会被重放/重传,且通过状态回滚或事件消息去重,算子在更新内部管理状态时本质上是幂等的。
Exactly Once,不如说是Effectively Once
Exactly Once
在描述正好处理一次时会产生误导,直观上它描述流中的每个事件只被处理一次,实际上没有引擎能保证正好只处理一次。用户代码被部分执行的可能性永远存在,在面对任意故障时,不可能保证每个算子中的用户定义逻辑在每个事件中只执行一次。
当引擎声明Exactly Once
处理语义时,实际上保证的是引擎管理的状态更新只提交一次到持久的后端存储。上面描述的两种机制都用后端持久化存储作为真实性来源,可以保存每个算子的状态并自动提交更新。
- 分布式快照/状态检查点机制,通过持久化后端状态,保存流应用程序的全局一致状态检查点(每个算子的检查点状态)。
At Least Once
+ 消息去重机制,通过持久后端状态,存储每个算子的状态以及每个算子的事务日志,该日志跟踪它已经完全处理的所有事件。
提交状态或对作为真实来源的持久后端应用更新可以被描述为恰好发生一次。事件的处理可以发生多次(在事件上执行任意用户定义逻辑时,如果发生故障,可能不止一次地发生。),但该处理效果只在持久后端状态存储中反映一次,描述这些处理语义最好的术语是有效一次(Effectively Once
)。
业界对于Exactly Once
投递语义有很大争议,很多人拿FLP
不可能理论或其他一致性定律对此议题进行否定,但特定场景的Exactly Once
语义实现并不是很复杂,只是大家没有精确描述问题的本质。当一条消息的消费结果只能在业务系统中生效一次时,需要解决的只是如何保证同一条消息的消费幂等问题,RocketMQ
的Exactly Once
语义就是解决业务中最常见的一条消息的消费结果在数据库系统中有且仅生效一次的问题。
集群管控操作幂等
集群管控操作幂等,比如创建Topic
、切换Leader
等集群管控类操作接口。
重复消费
重复消费问题是使用MQ
时经常会遇到的挑战场景之一,这不是MQ
本身的设计缺陷,而是由于网络不稳定、消费者故障、负载均衡调整等多种因素导致的。为了解决这一问题,引入了上面说到的幂等概念(在消息消费时,幂等保证即使消息被重复消费,也不会对业务逻辑产生副作用)。
以一个链路较长的典型电商场景(用户下单)为例,来看下如果消息不幂等可能带来的后果。
用户提交订单,系统生产一条下单MQ
消息,期望订单服务消费消息来创建订单,扣减库存,生成支付单。在这个业务链路中,可能因为网络问题、消费者宕机、消息队列重启等原因导致消息被重复发送和消费。如果消费逻辑不具备幂等性,重复消费可能会导致数据重复处理、库存错误扣减、订单重复生成等问题。
- 订单重复创建:同一条下单消息被消费多次,导致生成多个相同的订单。
- 库存超扣:如果库存扣减逻辑没有考虑幂等设计,多次扣减可能会导致库存变为负数。
- 支付单重复生成:支付单与订单一一对应,订单重复会导致支付单也重复生成。
防御性设计
业务逻辑本身需要设计成幂等的,即多次执行相同操作不会产生不同的结果。常见的几种幂等设计方案如下,
- 使用数据库的唯一键来保证数据幂等一致,比如在业务表中建立合适的唯一索引,或者增加一个携带唯一约束的流水记录表。
- 加乐观锁,为业务数据增加版本号,每次更新数据时,比较当前数据的版本号和更新数据的版本号是否一致,一致才更新数据,并将版本号加1,否则不更新。
- 使用分布式事务或者分布式锁,事务包含三个动作,分别是检查消费状态、更新数据、记录消费状态。在更新数据前,先检查是否执行过这个更新操作,一定要保证这三个动作的原子性,避免因并发导致多次更新问题。
以库存扣减为例,设计如下幂等逻辑,
- 检查库存状态:在扣减库存前,先检查当前库存是否足够。
- 使用数据库事务:将库存检查和扣减操作放在一个数据库事务中执行,确保原子性。
- 记录操作日志:在扣减库存时,记录操作日志(包括操作类型、操作时间、操作结果等),以便后续审计和回滚。
总结
幂等是解决MQ
单消息重复消费问题的有效手段。通过为每条消息生成唯一标识、在消费者端实现去重逻辑以及设计幂等的业务逻辑,可以确保即使消息被重复消费,也不会对业务系统产生负面影响。在实际应用中,可以根据具体业务场景选择合适的去重方式和实现细节,来满足系统对幂等性的要求。