RabbitMQ-08-SpringBoot整合使用

RabbitMQ-08-SpringBoot整合使用

1. 使用场景概述

  • rabbitMQ的作用 : 解耦 削峰 异步

1.1 同步异步的问题(串行)

  • 串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

mark

代码示例

1
2
3
4
5
6
7
8
9
10
public void makeOrder(){
// 1 :保存订单
orderService.saveOrder();
// 2: 发送短信服务
messageService.sendSMS("order");//1-2 s
// 3: 发送email服务
emailService.sendEmail("order");//1-2 s
// 4: 发送APP服务
appService.sendApp("order");
}

1.2 并行方式 异步线程池

  • 并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

mark

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void makeOrder(){
// 1 :保存订单
orderService.saveOrder();
// 相关发送
relationMessage();
}
public void relationMessage(){
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 2: 发送短信服务
messageService.sendSMS("order");
}
})
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 3: 发送email服务
emailService.sendEmail("order");
}
})
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 4: 发送短信服务
appService.sendApp("order");
}
})
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 4: 发送短信服务
appService.sendApp("order");
}
})
}

存在问题:
1:耦合度高
2:需要自己写线程池自己维护成本太高
3:出现了消息可能会丢失,需要你自己做消息补偿
4:如何保证消息的可靠性你自己写
5:如果服务器承载不了,你需要自己去写高可用

1.3 异步消息队列模式

mark

好处
1:完全解耦,用MQ建立桥接
2:有独立的线程池和运行模型
3:出现了消息可能会丢失,MQ有持久化功能
4:如何保证消息的可靠性,死信队列和消息转移的等
5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

代码示例

1
2
3
4
5
public void makeOrder(){
// 1 :保存订单
orderService.saveOrder();
rabbitTemplate.convertSend("ex","2","消息内容");
}
  • 之后若进行服务的扩展(高内聚 低耦合的特点)

mark

mark

  • 还有以下场景的使用
1
2
3
4
5
04、分布式事务的可靠消费和可靠生产
05、索引、缓存、静态化处理的数据同步
06、流量监控
07、日志监控(ELK)
08、下单、订单分发、抢票

2. SpringBoot - fanout 模式

  • 使用springboot完成rabbitmq的消费模式-Fanout

mark

步骤概览:

1:创建生产者工程:springboot-rabbitmq-fanout-producer
2:创建消费者工程:springboot-rabbitmq-fanout-consumer
3:引入spring-boot-rabbitmq的依赖
4:进行消息的分发和测试
5:查看和观察web控制台的状况

代码实现:生产者和消费者都需要配置

  1. 引入依赖
1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
  1. 在application.yml进行配置
1
2
3
4
5
6
7
8
9
10
11
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: ip
port: 5672

2.1 生产者

mark

  • OrderService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;


@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定义交换机
private String exchangeName = "fanout_order_exchange";
// 2: 路由key
private String routeKey = "";
public void makeOrder(Long userId, Long productId, int num) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
// 2: 根据商品id productId 去查询商品的库存
// int numstore = productSerivce.getProductNum(productId);
// 3:判断库存是否充足
// if(num > numstore ){ return "商品库存不足..."; }
// 4: 下单逻辑
// orderService.saveOrder(order);
// 5: 下单成功要扣减库存
// 6: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
}
  1. FanoutConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.zhuuu.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

// 1. 定义交换机
@Bean
public FanoutExchange fanoutOrderExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new FanoutExchange("fanout_order_exchange", true, false);
}

// 2. 声明队列
@Bean
public Queue emailQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("weixin.fanout.queue", true);
}

// 3. 绑定关系
@Bean
public Binding bindingFanoutweixinQueue() {
return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding bindingFanoutsmsQueue() {
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding bindingFanoutemailQueue(){
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
}

}

2.2 消费者

mark

邮件服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列消费者绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("email-------------->" + message);
}
}

短信服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和消费者绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class SMSService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("sms-------------->" + message);
}
}

微信服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和消费者绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class WeixinService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("weixin-------------->" + message);
}
}

2.3 结果

  • 启动项目 结果预览

mark

3. SpringBoot - Direct 模式

  • 环境设置同第二节

3.1 生产者

  • orderService : 不同的是交换的名字还有类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;


@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定义交换机
private String exchangeName = "direct_order_exchange";
// 2: 路由key
private String routeKey = "";
public void makeOrder(Long userId, Long productId, int num) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
// 2: 根据商品id productId 去查询商品的库存
// int numstore = productSerivce.getProductNum(productId);
// 3:判断库存是否充足
// if(num > numstore ){ return "商品库存不足..."; }
// 4: 下单逻辑
// orderService.saveOrder(order);
// 5: 下单成功要扣减库存
// 6: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
}
  • DirectRabbitConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.zhuuu.rabbitmq.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DirectRabbitConfig {

// 1.声明队列
@Bean
public Queue emailQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.direct.queue", true);
}
@Bean
public Queue smsQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue weixinQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("weixin.direct.queue", true);
}

//2. Direct交换机 起名:TestDirectExchange
@Bean
public DirectExchange directOrderExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("direct_order_exchange", true, false);
}

//3. 绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
public Binding bindingDirect1() {
return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with("");
}
@Bean
public Binding bindingDirect2() {
return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("");
}
@Bean
public Binding bindingDirect3() {
return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("");
}
}

3.2 消费者

结构如下:

mark

  • 代码如上2.2 雷同,这里不再详细写出

4. SpringBoot - Topic 模式

  • 代码如上雷同,这里不再详细写出
打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信