RabbitMQ-11-存储机制

RabbitMQ-11-存储机制

mark

1. 存储机制

  • 持久化消息
    • RabbitMQ的持久化队列分为:
      1:队列持久化
      2:消息持久化
      3:交换机持久化
  • 非持久消息:是指当内存不够用的时候,会把消息和数据转移到磁盘,但是重启以后非持久化队列消息就丢失。
  • 不论是持久化的消息还是非持久化的消息都可以写入到磁盘中,只不过非持久的是等内存不足的情况下才会被写入到磁盘中。
    • 持久化的消息在到达队列的时候就被写入到磁盘,并且如果可以,持久化的消息会在内存中保存一份备份,这样就可以提高一定的性能,当内存吃紧的时候就会从内存中清除
    • 非持久化的消息一般只保存在内存中,当内存吃紧的时候就会被换入到磁盘中,以节省空间
    • 这两种类型的消息落盘处都是RabbitMQ的持久层完成的。

1.1 队列的持久化

mark

  • queue的持久化是通过durable=true来实现的。
  • 一般程序中这么使用:关键的是第二个参数设置为true,即durable=true.
1
2
3
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name", true, false, false, null);
  • Channel类中queueDeclare的完整定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    /**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

/**
参数说明:

queue:queue的名称

exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
  • queueDeclare相关的有4种方法,分别是:
1
2
3
4
5
6
7
8
9
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

/**
其中需要说明的是queueDeclarePassive(String queue)可以用来检测一个queue是否已经存在。如果该队列存在,则会返回true;如果不存在,就会返回异常,但是不会创建新的队列。

1.2 消息的持久化

  • 如过将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重置被持久化的queue。

  • 队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。

    • 也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。
  • 如果要在重启后保持消息的持久化必须设置消息是持久化的标识。

1
2
3
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());

//这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN
  • 首先看一下basicPublish的方法:
1
2
3
4
5
6
7
8
9
10
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;

/**
exchange表示exchange的名称
routingKey表示routingKey的名称
body代表发送的消息体
  • 这里关键的是BasicProperties props这个参数了,这里看下BasicProperties的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public BasicProperties(
String contentType,//消息类型如:text/plain
String contentEncoding,//编码
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//优先级
String correlationId,
String replyTo,//反馈队列
String expiration,//expiration到期时间
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)

//这里的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。
  • 上面的实现代码使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那么这个又是什么呢?
1
2
3
4
5
6
7
8
9
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
//可以看到这其实就是讲deliveryMode设置为2的BasicProperties的对象,为了方便编程而出现的一个东东。
  • 换一种实现方式:
1
2
3
4
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());

注意:

  • 设置了队列和消息的持久化之后,当broker服务重启的之后,消息依旧存在。
  • 单只设置队列持久化,重启之后消息会丢失;单只设置消息的持久化,重启之后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

1.3 交换机的持久化

  • 上面阐述了队列的持久化和消息的持久化,如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。

  • 这里博主建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

// 一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);即在声明的时候讲durable字段设置为true即可。

1.4 小结

  • 将queue,exchange, message等都设置了持久化之后就能保证100%保证数据不丢失了嚒?
    答案是否定的。
  1. 首先,从consumer端来说,如果这时autoAck=true,那么当consumer接收到相关消息之后,还没来得及处理就crash掉了,那么这样也算数据丢失,这种情况也好处理,只需将autoAck设置为false(方法定义如下),然后在正确处理完消息之后进行手动ack(channel.basicAck).

  2. 其次,关键的问题是消息在正确存入RabbitMQ之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。

  • 那么这个怎么解决呢?
    • 首先可以引入RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多,很多现实生产环境下都是配置了mirrored-queue的。
    • 还有要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端,有关RabbitMQ的事务机制或者Confirm机制.RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。
  • 消息什么时候刷到磁盘?
  1. 写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。

  2. 有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。

  3. 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

2. 内存磁盘告警

  • 当内存使用超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接,并且停止接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。

mark

  • 当出现blockingblocked话说明到达了阈值和以及高负荷运行了。
    • blocking :并不试图关闭发送消息的Connection,比如消费者的Connection。这种情况下Connection可以继续运行
    • blocked :对应于一直有消息发送的Connection。这种情况下Connection会停止发送消息
    • 如果一个Broker的内存或者磁盘受限,都会引起所有的Connection被阻塞。
  • 理想的情况是当发生阻塞的时候可以阻止生产者的同时而又不影响消费者,但是在AMQP协议中,一个channel 上可以同时承载生产者和消费者,同一个Connection 中也可以同时承载若干个生产者和消费者的信道和消费者信道,这样就会使得逻辑错乱。
    • 这里建议生产和消费的逻辑可以分摊到独立的Connection 中而不发生任何的交集。

注意:

2.1 内存告警

mark

mark

  • rabbitMQ执行rabbitmqctl set_vm_memory_high_watermarkfraction 可以设置内存的阈值,默认是0.4

    • 表示当内存使用超过40%的时候,就会产生内存告警并且阻塞当前所有生产者的连接。
    • 在最坏的情况下,Erlang 的垃圾回收机制会导致两倍的内存消耗,也就是80%的占用比
  • 内存的阈值可以通过以下两种方式来设置(百分比,绝对内存大小)

    • 百分比方式

      • 配置文件:重启才会生效 。当前配置文件:/etc/rabbitmq/rabbitmq.conf
      • 命令行: 重启会失效 rabbitmqctl set_vm_memory_high_watermark <fraction>
      • 大小设置在0.4到0.66之间,不建议超过0.7
    • 绝对内存大小

      • 配置文件:重启才会生效。当前配置文件:/etc/rabbitmq/rabbitmq.conf
      • 命令行:重启会失效 rabbitmqctl set_vm_memory_high_watermark absolute 50MB
    • fraction/value 为内存阈值。默认情况是:0.4/2GB。代表的含义是:当RabbitMQ的内存超过40%时,就会产生警告并且阻塞所有生产者的连接。通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会生效。

      1
      2
      3
      4
      5
      6
      #默认
      #vm_memory_high_watermark.relative = 0.4
      # 使用relative相对值进行设置fraction,建议取值在04~0.7之间,不建议超过0.7.
      vm_memory_high_watermark.relative = 0.6
      # 使用absolute的绝对值的方式,但是是KB,MB,GB对应的命令如下
      vm_memory_high_watermark.absolute = 2GB

2.2 换页

  • 默认情况下,在内存达到阈值的50%的时候会进行换页的操作

    • 在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
    • 也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作。
    • 比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达到极限400mb之前,会把内存中的200MB进行转移到磁盘中。从而达到稳健的运行。
  • 可以通过设置 vm_memory_high_watermark_paging_ratio 来进行调整

1
2
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(设置小于1的值)
  • 此时,如果将vm_memory_high_watermark_paging_ratio 设置为大于1的浮点数的话,这种配置相当于仅用了换页的功能

2.3 磁盘预警

  • 当剩余磁盘空间低于确定阈值的时候,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化消息持续换页而耗尽磁盘空间导致服务崩溃

    • 默认情况下,磁盘阈值为50MB
    • 这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB ,第二检查可能就是1MB,就会出现警告。
  • 通过命令方式修改如下:

1
2
3
4
rabbitmqctl set_disk_free_limit  <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit:固定单位 KB MB GB
fraction :是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)
  • 通过配置文件配置如下:
1
2
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb

3. 惰性队列

  • RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

  • 默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份

    • 当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。
    • 虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
  • 惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/O的使用,

    • 如果消息是持久化的,那么这样的I/O操作不可避免,惰性队列和持久化消息可谓是“最佳拍档”。
    • 注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
  • 队列具备两种模式:default和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置

    • 如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
    • 在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
1
2
3
4
5
6
7
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);


对应的Policy设置方式为:
rabbitmqctl set_policy Lazy "^myqueue$" '{"queue-mode":"lazy"}' --apply-to queues
  • 惰性队列和普通队列相比,只有很小的内存开销。这里很难对每种情况给出一个具体的数值,但是我们可以类比一下:当发送1千万条消息,每条消息的大小为1KB,并且此时没有任何的消费者,那么普通队列会消耗1.2GB的内存,而惰性队列只消耗1.5MB的内存。
    • 据官网测试数据显示,
      • 对于普通队列,如果要发送1千万条消息,需要耗费801秒,平均发送速度约为13000条/秒。
      • 如果使用惰性队列,那么发送同样多的消息时,耗时是421秒,平均发送速度约为24000条/秒。出现性能偏差的原因是普通队列会由于内存不足而不得不将消息换页至磁盘。如果有消费者消费时,惰性队列会耗费将近40MB的空间来发送消息,对于一个消费者的情况,平均的消费速度约为14000条/秒。
    • 如果要将普通队列转变为惰性队列,那么我们需要忍受同样的性能损耗。
      • 当转变为惰性队列的时候,首先需要将缓存中的消息换页至磁盘中,然后才能接收新的消息。
      • 反之,当将一个惰性队列转变为普通队列的时候,和恢复一个队列执行同样的操作,会将磁盘中的消息批量的导入到内存中。

参考博客系列https://blog.csdn.net/u013256816/article/details/54743481

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信