RabbitMQ-13-分布式事务

RabbitMQ-13-分布式事务

1. 前言

  • 分布式事务指事务的操作位于不同的节点上,需要保证事务的AICD 特性。

  • 例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。

  • 在分布式系统中,要实现分布式事务,无外乎那几种解决方案。

2. 生产者确认机制

  • 在使用RabbitMQ的时候,可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?
    • 如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器

RabbitMQ为我们提供了两种方式:

  1. 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  2. 通过将channel设置成confirm模式来实现;

2.1 事务机制

  • 这里首先探讨下RabbitMQ事务机制。
    • RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务
    • 在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了
    • 如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

关键代码展示

1
2
3
channel.txSelect();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
channel.txCommit();
1
2
通过wirkshark抓包(ip.addr==xxx.xxx.xxx.xxx && amqp),可以看到:
(注意这里的Tx.Commit与Tx.Commit-Ok之间的时间间隔294ms,由此可见事务还是很耗时的。)

mark

  • 我们先来看看没有事务的通信过程是什么样的:

mark

可以看到带事务的多了四个步骤:

  • client发送Tx.Select
  • broker发送Tx.Select-Ok(之后publish)
  • client发送Tx.Commit
  • broker发送Tx.Commit-Ok
  • 下面我们来看下事务回滚是什么样子的。关键代码如下:
1
2
3
4
5
6
7
8
9
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}

mark

代码中先是发送了消息至broker中但是这时候发生了异常,之后在捕获异常的过程中进行事务回滚。

注意:

  • 事务确实能够解决producerbroker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发

  • 但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?

    • 从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

2.2 Confirm 机制

  • 上面我们介绍了RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,随后通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。
  • 实现原理
    • 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了
    • 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
    • confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息
    • 如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
    • channel被设置成confirm模式之后,所有被 publish的后续消息都将被confirm(即 ack) 或者被nack一次。
    • 但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

开启confirm 模式的方式

  • 生产者通过调用channel的confirmSelect方法将channel设置为confirm模式,如果没有设置no-wait标志的话,broker会返回confirm.select-ok表示同意发送者将当前channel信道设置为confirm模式
  • (从目前RabbitMQ最新版本3.6来看,如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的)。

mark

注意:

  • 已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。
  • 事务机制和publisher confirm 机制作用都是确保消息能够正确的发送到rabbitMQ

2.3 Confirm 编程

  • 对于固定消息体大小和线程数,如果消息持久化,生产者confirm(或者采用事务机制),消费者ack那么对性能有很大的影响.
  • 消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。
  • 生产者confirm这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程方式:
    • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
    • 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
    • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

从编程实现的复杂度上来看:

第一种:普通confirm 机制

  • 普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

关键代码如下:

1
2
3
4
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}

wirkShark抓包可以看到如下:

mark

(注意这里的Publish与Ack的时间间隔:305ms 4ms 4ms 15ms 5ms… )

第二种:批量 confirm模式

  • 批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率
  • 是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

关键代码:

1
2
3
4
5
6
7
channel.confirmSelect();
for(int i=0;i<batchCount;i++){
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}

第三种:异步 Confirm机制

  • 异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出的消息序号)
    • 我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录
    • 从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});

while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}

SDKwaitForConfirms方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/** Set of currently unconfirmed messages (i.e. messages that have
* not been ack'd or nack'd by the server yet. */
private final SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet<Long>());

public boolean waitForConfirms(long timeout)
throws InterruptedException, TimeoutException {
if (nextPublishSeqNo == 0L)
throw new IllegalStateException("Confirms not selected");
long startTime = System.currentTimeMillis();
synchronized (unconfirmedSet) {
while (true) {
if (getCloseReason() != null) {
throw Utility.fixStackTrace(getCloseReason());
}
if (unconfirmedSet.isEmpty()) {
boolean aux = onlyAcksReceived;
onlyAcksReceived = true;
return aux;
}
if (timeout == 0L) {
unconfirmedSet.wait();
} else {
long elapsed = System.currentTimeMillis() - startTime;
if (timeout > elapsed) {
unconfirmedSet.wait(timeout - elapsed);
} else {
throw new TimeoutException();
}
}
}
}
}

性能测试

1
2
3
4
参数
Client端机器和RabbitMQ机器配置:CPU:24核,2600MHZ, 64G内存,1TB硬盘。
Client端发送消息体大小10B,线程数为1即单线程,消息都持久化处理(deliveryMode:2)。
分别采用事务模式、普通confirm模式,批量confirm模式和异步confirm模式进行producer实验,比对各个模式下的发送性能。

mark

发送平均速率:

  • 事务模式(tx):1637.484
  • 普通confirm模式(common):1936.032
  • 批量confirm模式(batch):10432.45
  • 异步confirm模式(async):10542.06

小结:

  • 可以看到事务模式性能是最差的,普通confirm模式性能比事务模式稍微好点,但是和批量confirm模式还有异步confirm模式相比,还是小巫见大巫。
  • 批量confirm模式的问题在于confirm之后返回false之后进行重发这样会使性能降低,异步confirm模式(async)编程模型较为复杂,至于采用哪种方式,那是仁者见仁智者见智了。

3. 消费端的消息确认

  • 为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。

  • 消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。

    • 否则,RabbitMQ会在队列中消息被消费后立即删除它。
  • 采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。


  • noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。
    • 如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
    • RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
  • RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数
    • 分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数
    • 也可以通过命令行来查看上述信息:

mark

代码示例(关闭自动消息确认,进行手动ack):

1
2
3
4
5
6
7
8
9
   QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(ConfirmConfig.queueName, false, consumer);

while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
// do something with msg.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
1
2
3
4
5
6
7
8
9
10
11
broker将在下面的情况中对消息进行confirm:

1. broker发现当前消息无法被路由到指定的queues中(如果设置了mandatory属性,则broker会发送basic.return)
2. 非持久属性的消息到达了其所应该到达的所有queue中(和镜像queue中)
3. 持久消息到达了其所应该到达的所有queue中(和镜像中),并被持久化到了磁盘(fsync)
4. 持久消息从其所在的所有queue中被consume了(如果必要则会被ack)


basicRecover:是路由不成功的消息可以使用recovery重新发送到队列中。
basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。
basicNack:可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的所有未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。

4. 分布式事务解决方案

4.1 2PC

  • 两段式提交(2PC) 需要数据库产商的支持,java组件有atomikos等

  • 两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。

  1. 准备阶段

mark

  1. 提交阶段
  • 如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
  • 需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。

mark

存在的问题

  • 2.1 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。

  • 2.2 单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。

  • 2.3 数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。

  • 2.4 太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

4.2 TCC

  • TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:
    • Try 阶段主要是对业务系统做检测及资源预留
    • Confirm阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认 - - - Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
    • Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
1
2
3
4
举个例子,假入 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用
1:首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
2:在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。
3:如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。
  • 优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些
  • 缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

4.3 本地消息表

  • (异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式

  • 本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。

    • 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
    • 之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
    • 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。

mark

  • 优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。
  • 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

4.4 MQ事务

  • 有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 Kafka 不支持。
  • 以阿里的 RocketMQ中间件为例,其思路大致为:
    • 第一阶段Prepared消息,会拿到消息的地址。 第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
    • 也就是说在业务方法内要向消息队列提交两次请求,一次发送消息和一次确认消息。
      • 如果确认消息发送失败了.RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认
      • 所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

mark

优缺点分析

  • 优点: 实现了最终一致性,不需要依赖本地数据库事务。
  • 缺点: 实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也未开源。

5. rabbitMQ 实现可靠生产与消费

  1. 可靠生产

mark

  • 所谓的MQ可靠生产:就是利用消息冗余的方式(增加一个status的状态)
    • 如果成功:修改状态
    • 如果失败:定时器重新执行
    • 本质利用的是消息队列的ack应答机制

mark

  1. 可靠消费
  • 需要注意的是:消费者在消费过程中出现异常,rabbitMQ会一直进行不停的重试(即代码进入死循环)

mark

mark

  • 解决方案
    • 控制重发的次数 + 死信队列
    • try catch 和 手动ack
      • 注意这里无法再设置重发次数,如果设置了重发,那么还是会进入死循环
      • 这里try catch 和 手动ack 会造成消息的丢失
      • 解决方法是方案三:用try catch和 手动ack + 死信队列
    • try catch和 手动ack + 死信队列
  • 注意事项
    • 要考虑数据的幂等性:若死信消费者和普通消费者同时保存订单,那么就不满足幂等性了
      • 如下订单的方式:可以使用订单id作为一个唯一主键来保证幂等性
      • 分布式锁来解决这个问题

6. 小结

  • 通过本文我们总结并对比了几种分布式分解方案的优缺点,分布式事务本身是一个技术难题,是没有一种完美的方案应对所有场景的,具体还是要根据业务场景去抉择吧。
  • 阿里RocketMQ去实现的分布式事务,现在也有除了很多分布式事务的协调器,比如LCN等,大家可以多去尝试。

基于MQ的优点

1、通用性强
2、拓展方便
3、耦合度低,方案也比较成熟

基于MQ分布式事务的缺点

1、基于消息中间件,只适合异步场景
2、消息会延迟处理,需要业务上能够容忍

建议

1、尽量去避免分布式事务
2、尽量将非核心业务做成异步

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信