Springboot整合rabbitmq

安装

rabbitMQ介绍

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。

MQ介绍

MQ是一个互联网架构中常见的解耦利器。
什么时候不使用MQ?
上游实时关注执行结果
什么时候使用MQ?

  • 1)数据驱动的任务依赖,多个任务需要轮流执行,轮流订阅上一个任务。
  • 2)上游不关心多下游执行结果,上游执行完发送到MQ,多下游订阅MQ。
  • 3)异步返回执行时间长

img

1、死信队列多了一个过期的机制,到期会自动去尝试消费,通过死信交换机做匹配,选择想要的消费失败的消息。
消息中间件是在消息传输过程中保存消息的容器。队列的主要目的是提供路由并保证消息的传递。
特点:

  • 异步处理模式
  • 多个应用程序调用关系为松耦合关系

传递模型:

  • 1、点多点模型PTP
    每个消息只用一个消费者
    发送者和接收者没有时间依赖
    接受者确认消息接受和处理成功
  • 2、发布-订阅模型Pub/Sub
    一对多关系,通过订阅主题,发布者建立一个订阅,订阅者保持持续的活动状态以接收消息。
    每个消息可以有多个订阅者
    客户端只有订阅后才能接收到消息,有时间依赖。
    持久订阅 订阅关系建立后,消息不会消失,不管订阅者是否都在线
    非持久订阅 订阅者为了接受消息,必须一直在线

安装步骤:

使用docker查询rabbitmq的镜像

img

安装镜像

安装name为rabbitmq的这里是直接安装最新的,如果需要安装其他版本在rabbitmq后面跟上版本号即可

1
docker pull rabbitmq

img

运行mq:

需要注意的是-p 5673:5672 解释:-p 外网端口:docker的内部端口 ,你们可以改成自己的外网端口号,我这里映射的外网端口是5673那么程序连接端口就是用5673

1
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5673:5672 rabbitmq

img

通过docker ps -a 查看部署的mq容器id,在通过 docker exec -it 容器id /bin/bssh 进入容器内部在
运行:rabbitmq-plugins enable rabbitmq_management

img

说明: 有些用docker exec -it 容器id /bin/bash 执行这个命令会报如下错:

img

那你可以把脚本类型 /bin/bash,尝试换为 /bin/sh 试一下

整合Springboot

引入Springboot整合RabbitMQ的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

yml配置文件

1
2
3
4
5
6
7
8
9
#配置RabbitMQ
spring:
rabbitmq:
host: 192.168.55.180
port: 5673
username: guest
password: guest
virtual-host: /

RabbitMQ的配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration//Rabbit配置类
public class RabbitConfig {
private final String EXCHANGE_NAME = "boot_topic_exchange";
private final String QUEUE_NAME = "boot_queue";


//创建交换机
@Bean("bootExchange")
public Exchange getExchange()
{
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)//交换机类型 ;参数为名字
.durable(true)//是否持久化,true即存到磁盘,false只在内存上
.build();
}
//创建队列
@Bean("bootQueue")
public Queue getMessageQueue()
{
return new Queue(QUEUE_NAME);
}
//交换机绑定队列
@Bean
//@Qualifier注解,使用名称装配进行使用
public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with("#.message.#")
.noargs();
}
}

使用springboot的@Test注解来测试,测试生产者的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootTest
public class TestProducer {
//注入RabbitTemplate工具类(rabbit内部的,可以发送消息)
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage()
{
/**
* 发送消息
* 参数1:交换机
* 参数2:路由键
* 参数3:要发送的消息
*/
rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十二开始了!");
}
}

使用消费者监听队列,接收消息

1
2
3
4
5
6
7
8
9
10
11
//消费者
@Component
public class Consumer {
//监听队列
@RabbitListener(queues = "boot_queue")
public void listenMessage(Message message)
{
System.out.println("接收消息:"+message);
}
}

整合后的代码,就是不用自己去实例化(创建连接工厂,连接,信道);让spring容器来控制实例的创建到销毁。
代码的实现有生产者和消费者、还有配置类(创建交换机跟队列及其绑定操作),都独立为一个类(共3个类),yml文件中配置rabbitmq的一些属性。

消息的可靠性传递

RabbitMQ消息投递的路径:生产者→交换机→队列→消费者
消息的可靠性传递有以下三种模式:

1
2
3
4
确认模式(confirm):可以监听消息是否从生产者成功传递到交换机。
退回模式(return):可以监听消息是否从交换机传递到队列。
消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

三种模式刚好监听完RabbitMQ的一整套流程。即我们能够由这三种模式得到消息的传递及处理的结果。

确认模式(confirm):
即在生产者类中调用setConfirmCallback方法,创建rabbitTemplate回调函数,然后重写confirm方法,做出boolean结果的处理,失败时可以输出失败的原因。并且需要在yml配置类中开启确认模式:

1
2
3
#开启确认模式
publisher-confirm-type: correlated

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//定义确认模式的回调方法,当消息向交换机发送后会调用confirm方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 被调用的回调方法
* @param correlationData 相关配置信息
* @param b 交换机是否成功收到了消息
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if(b)
{
System.out.println("confirm接收成功!");
}else
{
System.out.println("confirm接收失败,原因是:"+s);
//处理失败的操作
}
}
});

退回模式(return):

yml文件中开启回退模式:

1
2
3
#开启回退模式
publisher-returns: true

回退模式,也是调用rabbitTemplate的setRenturnsCallback,即设置回退模式的回调函数,创建rabbitTemplate内部封装的一个回调方法并重新回调方法内的一个returnedMessage()方法。
此方法只在从交换机发送队列失败时调用。
具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//定义退回模式的回调方法
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* @param returnedMessage 失败后将失败信息封装到参数中
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息对象:"+returnedMessage.getMessage());
System.out.println("错误码:"+returnedMessage.getReplyCode());
System.out.println("错误信息:"+returnedMessage.getReplyText());
System.out.println("交换机:"+returnedMessage.getExchange());
System.out.println("路由键:"+returnedMessage.getRoutingKey());
//处理消息
}
});

前俩个模式的yml文件配置是在生产者中配置,而消费者消息确认是在消费者的yml文件中配置的。
消费者消息确认(Ack):
分为自动确认和手动确认俩种

自动确认时,若消费者类的监听queue的方法出现bug并且是自动确认接收时,queue的消息也会被清空;这是自动确认的一个弊端。
即因bug而未处理消息。
yml配置文件:

1
2
3
4
5
#开启自动确认
listener:
simple:
acknowledge-mode: none

消费者测试代码模块,还是一样,使用@RabbitListener注解根据Id指定队列监听;不同的是需要在方法内部模拟出现bug,在处理消息之前加入int i = 1/0即可。

手动确认:
也是在消费者模块来处理,配置文件在存在消费者的项目中的yml文件中设置;在消费者中,需要手动确认,在正常情况下就处理消息然后签收;出现Bug就将序号和消息传回队列并拒收,再从队列中重新接收设置序号(deliveryTag),来得到投递的序号。为了避免队列一直发送消息,可以调用线程的休眠方法。
yml配置文件:

1
2
3
4
5
#手动确认 manual
listener:
simple:
acknowledge-mode: manual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

//消费者消息确认
@Component
public class AckConsumer {
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
//消息投递序号,消息每次投递该值都会+1
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
int i = 1 / 0;//模拟处理消息出现bug
System.out.println("成功接收到消息:" + message);
//签收消息
/**
* 参数1:消息投递序号
* 参数2:是否一次可以签收多条消息
*/
channel.basicAck(deliveryTag,true);
}catch (Exception e)
{
System.out.println("消息消费失败");
Thread.sleep(2000);
//拒签消息
/**
* 参数1:消息投递序号
* 参数2:是否可以一次拒签多条消息
* 参数3:拒签后消息是否重回队列
*/
channel.basicNack(deliveryTag,true,true);
}
}
}


RabbitMQ的高级特性

消费端限流

在消费端项目的yml文件中配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#配置RabbitMQ
spring:
rabbitmq:
host: 192.168.55.180
port: 5673
username: guest
password: guest
virtual-host: /
#开启自动确认 none 手动确认 manual
listener:
simple:
#消费端限流机制必须开启手动确认
acknowledge-mode: manual
#消费端最多拉取的消息条数,签收后不满该条数才会继续拉取
prefetch: 5

就是说从生产端发送过来的消息,在队列等待消费端接收,如果消费端处理消息业务的速度相对较慢,积累的消息过多从而处理不过来(资源耗尽),会导致系统性能降低或瘫痪。
因为消费端每秒处理消息的条数有限,所以我们需要在消费端进行一个限流,故而限制了队列消息的投递。
即消费端限流也就是限制队列投递到消费端的流,也可以说是在队列与消费端之间进行一个限流。

公平分发和不公平分发

公平分发则不能在yml文件中设置限流(prefetch),公平分发即给多个消费者平分消息进行消费。这样会导致处理快的消费者在等待,故而浪费资源,降低性能。

不公平分发则需要在yml文件中设置限流(prefetch),并且prefetch: 1(即设置为1);不公平分发即每次拉取一条消息,谁处理得快就继续处理,这样可以极大的节约资源,从而提高性能。

消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,就会被移除队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息的存活时间:
就是说需要在配置类(RabbitConfig)中设置队列所有消息的存活时间;

1
2
3
4
5
return QueueBuilder
.durable(QUEUE_NAME)//队列持久化
.ttl(10000)//设置队列的所有消息存活10s
.build();

即在创建bean队列时,就要设置队列所有消息的存活时间。

设置某条消息的存活时间:

就是说只需要在发送的时候指定它的存活时间即可。
实现比较稍微麻烦一点,创建消息属性并设置存活时间,然后创建消息对象,消息对象 将消息属性作为参数,并且传入发送的消息,最后再将消息对象作为参数传给交换机,即可实现对单条消息设置存活时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//发送消息,并设置该消息的存活时间
@Test
public void testSendMessage()
{
//1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
//2.设置存活时间
messageProperties.setExpiration("10000");
//3.创建消息对象
Message message = new Message("sendMessage...".getBytes(),messageProperties);
//4.发送消息
rabbitTemplate.convertAndSend("my_topic_exchange1","my_routing",message);
}

若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。
因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。

在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准。

优先级队列

就是说在队列中指定可以设置的最大优先级,然后再对单条设置一个优先级数;

queue队列在config中创建:

1
2
3
4
5
6
7
8
9
10
//创建队列
@Bean(QUEUE_NAME)
public Queue getMessageQueue()
{
return QueueBuilder
.durable(QUEUE_NAME)//队列持久化
.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
.build();
}

发送优先级消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//发送有优先级的消息
@Test
public void testPriority()
{
for (int i = 0; i < 10; i++) {
if(i == 5)//i为5的优先级比较高
{
//1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
//2.设置优先级
messageProperties.setPriority(9);
//3.创建消息对象
Message message = new Message(("sendMessage..."+i).getBytes(),messageProperties);
//4.发送消息
rabbitTemplate.convertAndSend("priority_exchange","my_routing",message);
}else
rabbitTemplate.convertAndSend("priority_exchange","my_routing","sendMessage..."+i);
}
}

就是有个问题,在监听先开启时,再启动发送消息,此时优先级失效。

img

先发送消息再启动监听,优先级生效。

img

死信队列

在MQ中,当消息成为死信(Dead Message)后,消息中间件可以将其从当前队列发送到另一个队列,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机与队列和普通的没有区别。

消息成为死信的情况:

1
2
3
4
1.队列消息长度到达限制。
2.消费者拒签消息,并且不把消息重新放回原队列。
3.消息到达存活时间未被消费。

死信队列:
死信交换机和普通交换机,只是名字不同,其余的都一样;就是说死信交换机绑定死信队列,普通交换机绑定普通队列。
并且普通队列需要在创建的同时绑定死信交换机和死信队列的路由关键字;就是说当普通队列的消息达到死信的条件时,会通过绑定的死信交换机根据绑定的死信队列路由关键字发送该普通队列的消息,这个就是成为死信的过程。

除了拒签成为死信的需要监听普通队列,其余俩个不需要到达消费者端就会成为死信。
死信会有专门的死信消费者去监听。

死信队列的Config配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration//Rabbit配置类
public class RabbitConfig4 {
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";

private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";


//创建死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange()
{
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机
.durable(true)//是否持久化,true即存到磁盘,false只在内存上
.build();
}
//创建死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue()
{
return QueueBuilder
.durable(DEAD_QUEUE)//队列持久化
//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
.build();
}
//死信交换机绑定死信队列
@Bean
//@Qualifier注解,使用名称装配进行使用
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE) Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
//创建普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange()
{
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机
.durable(true)//是否持久化,true即存到磁盘,false只在内存上
.build();
}
//创建普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue()
{
return QueueBuilder
.durable(NORMAL_QUEUE)//队列持久化
//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
.deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机
.deadLetterRoutingKey("dead_routing")//死信队列路由关键字
.ttl(10000)//消息存活10s
.maxLength(10)//队列最大长度为10
.build();
}
//普通交换机绑定普通队列
@Bean
//@Qualifier注解,使用名称装配进行使用
public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange, @Qualifier(NORMAL_QUEUE) Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  @Test
public void testDLX()
{
//存活时间过期后成为死信
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信队列");
//超过队列长度后变成死信
// for (int i = 0; i < 20; i++) {
//
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信队列");
// }
//消息拒签但不返回原队列后变成死信
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信队列");
}

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。

通过死信队列实现延迟队列
由于RabbiMQ未提供延迟队列功能,可以使用死信队列实现延迟队列的效果。
即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。

监听延迟队列:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class ExpireOrderConsumer {
//监听过期订单队列
@RabbitListener(queues = "expire_queue")
public void listenMessage(String orderId)
{
//模拟处理数据库等业务
System.out.println("查询"+orderId+"号订单的状态,如果已支付无需处理,如果未支付则回退库存");
}
}


控制层代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
//下单(restful风格)
@RequestMapping(value = "/place/{orderId}",method = RequestMethod.GET)
public String placeOrder(@PathVariable String orderId)
{
//模拟service层处理
System.out.println("处理订单数据...");
//将订单id发送到订单队列
rabbitTemplate.convertAndSend("order_exchange","order_routing",orderId);
return "下单成功,修改库存";
}
}

通过延迟队列插件实现延迟队列:
先使用rz指令将rabbitmq_delayed_message_exchange-3.10.2.ez 上传至虚拟机;

下面俩种存储延迟队列插件位置看你用的rabbitmq是通过docker下载的,还是之前装在具体目录下的,前者用第二种,后者用第一种:

第一种:将插件放入RabbitMQ插件目录中:mv rabbitmq_delayed_message_exchange-3.10.2.ez /usr/local/rabbitmq/plugins/

第二种:我的rabbitmq是使用docker下载的,所以需要复制到容器的plugins下:docker cp rabbitmq_delayed_message_exchange-3.10.2.ez 容器ID:/plugins

复制完延迟队列插件可以执行删除插件的操作 :rm -f rabbitmq_delayed_message_exchange-3.10.2.ez

此时再次进入容器:docker exec -it 容器ID /bin/bash

进入容器的plugins目录:cd plugins

启动插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启RabbitMQ:rabbitmq-server restart
刷新rabbitmq的web页面查看,添加Exchange交换机,选择交换机里会出现x-delayed-message,如下图:

img

此时交换机是x-delayed-message类型的;延迟队列就是说发送延迟交换机及其绑定延迟队列的路由关键字和MessagePostProcessor对象,而MessagePostProcessor对象是用来设置延迟时长等属性的。
延迟队列只是创建延迟交换机时跟普通不同,其他基本相同。
执行流程就是浏览器通过url映射到Controller的@RequestMapping注解的value中,然后执行方法体,执行到发送MQ,然后结束方法体;MQ此时延迟方法体中设置的时长,再发送给消费端,消费端监听到再处理延迟队列的MQ。
controller控制层代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//下单
@RequestMapping(value = "/place2/{orderId}",method = RequestMethod.GET)
public String placeOrders(@PathVariable String orderId)
{
System.out.println("处理订单数据...");

//设置消息的延迟时间为10s,然后作为rabbitTemplate的额外参数
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
};

//将订单id发送到订单队列
rabbitTemplate.convertAndSend("delayed_exchange","delayed_routing",orderId,messagePostProcessor);
return "下单成功,修改库存";
}

配置类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig2 {
//延迟交换机和队列
private final String DELAYED_EXCHANGE = "delayed_exchange";
private final String DELAYED_QUEUE = "delayed_queue";


//延迟交换机
@Bean(DELAYED_EXCHANGE)
public Exchange delayedExchange()
{
//创建自定义交换机
Map args = new HashMap();
args.put("x-delayed-type","topic");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,args);
}

//延迟队列
@Bean(DELAYED_QUEUE)
public Queue delayedQueue()
{
return QueueBuilder
.durable(DELAYED_QUEUE)
.build();
}

//将延迟队列绑定到延迟交换机
@Bean
public Binding bindDelayedQueue(@Qualifier(DELAYED_EXCHANGE) Exchange exchange,@Qualifier(DELAYED_QUEUE) Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with("delayed_routing")
.noargs();
}


}


RabbitMQ集群

搭建RabbitMQ集群

​ 设置俩个RabbitMQ服务:

关闭rabbitmq服务:rabbitmqctl stop
设置RabbitMQ服务一:RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server start -detached

设置RabbitMQ服务二:RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start -detached
即创建俩个RabbitMQ服务。分别是15673和15674端口,结点名字分别是rabbit1和rabbit2;确保俩个端口号都可以访问RabbitMQ。
将俩个服务设置到同一集群中:
关闭要加入的节点服务:rabbitmqctl -n rabbit2 stop_app

新设置要加入的节点服务:rabbitmqctl -n rabbit2 reset
将服务2加入到服务1中:rabbitmqctl -n rabbit2 join_cluster rabbit1@localhost

localhost是默认的虚拟机名,如果有改过需要根据你自己的虚拟机名。
启动服务2:rabbitmqctl -n rabbit2 start_app

此时rabbit1是主节点,而rabbit2是从节点。而我的集群设置了3个服务:

img

镜像队列

由1搭建集群后,虽然多个节点可以互相通信,但队列只保存在一个节点(主节点)中,如果该节点出现故障,则整个集群都将丢失消息。
所以需要引入镜像队列机制来解决这个问题;它可以将队列消息复制到集群中的其他节点上。这样当主节点失效了,其他节点上的镜像可以保证服务的可用性。

第二天起来,因为电脑关机,需要重新启动一下rabbitmq集群,如下:
启动rabbitmq服务:rabbitmq-server -detached开启后台管理

开启管控台插件:rabbitmq-plugins enable rabbitmq_management开启WEB可视化

启动服务1:RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server start -detached
启动服务2:RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start -detached
添加镜像:在rabbitmq的web界面进入Admin中点击右上方的policies,即添加镜像的策略。如下设置:

img

Name为镜像策略的名字;Pattern(模式)的值为^,即应用于所有交换机和队列, ^xxx则是应用到以xxx开头交换机和队列。
ha-mode=all则是镜像模式:所有,即将队列镜像到集群的所有服务器中。
ha-sync-mode: automatic设置为自动同步,即当某一服务器宕机,而有新的消息发送到来,当该服务器再次启动后,它已经拿不到该消息了,但是通过设置该参数可以自动同步消息。
在关闭服务器1后,服务器1的消息丢失;因为没有设置自动同步,需要手动通过服务器2同步到服务器1。

img

集群的负载均衡

无论是生产者还是消费者,只能连接一个RabbitMQ节点,当存在RabbitMQ集群时,如果只连接一个RabbitMQ节点,会造成该节点压力过大。平均的发送请求到每个RabbitMQ节点,需要使用Haproxy做负载均衡用来分发请求。
负载均衡,即让集群均衡的负载,就是每个RabbitMQ节点都均衡的连接访问。

yum –选项命令包:
其中选项是可选的,选项包括-h(帮助),-y(当安装过程提⽰选择全部为”yes”),-q(不显⽰安装的过程)等等。安装Haproxy:yum -y install haproxy
配置Haproxy:vim /etc/haproxy/haproxy.cfg

1
2
3
4
5
#修改内容
defaults
#修改为tcp
mode tcp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#---------------------------------------------------------------------
# 以下为添加内容
#---------------------------------------------------------------------
listen rabbitmq_cluster
#对外暴露端口
bind 0.0.0.0:5672
mode tcp
balance roundrobin
#代理RabbitMQ的端口
server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
listen stats
#Haproxy控制台路径
bind 192.168.126.3:8100
mode http
option httplog
stats enable
stats uri /rabbitmq-stats
stats refresh 5s


启动Haproxy:haproxy -f /etc/haproxy/haproxy.cfg

此时,我出现了一个错误,如下:

[ALERT] 152/150249 (47831) : Starting proxy rabbitmq_cluster: cannot bind socket [0.0.0.0:5672];因为没有关闭rabbitmq后台服务(端口号被后台服务占用)。

关闭:rabbitmqctl stop;重新启动Haproxy即可。
从生产者发送一条消息,haproxy页面如下:

img

证明haproxy负载均衡配置生效。

总结:

由Springboot整合RabbitMQ实现通配符模式,通配符模式使用topic交换机,展示消息从生产者到消费者的一个过程。
生产者由Config配置类+yml配置文件还有发送消息的一个调用操作组成;而消费者由yml配置文件和多个监听指定队列方法的类组成。
RabbitMQ的Config配置类用来创建交换机、创建队列和将队列绑定到交换机;此时yml配置文件的内容是用来连接RabbitMQ的信息和一些模式的开关。
模式有确认模式(Confirm),回退模式(Return),消费者消息确认(Consumer Ack)三种;前俩种用在生产者,第三种用在消费者。

  • 确认模式是用以消息发送到交换机,此时不论成功或失败都会调用setConfirmCallback返回发送的结果(成功或失败),在失败时打印结果并处理失败的业务,成功时处理成功后的业务。

  • 回退模式则是消息从交换机发到队列的过程,当失败时调用setReturnsCallback方法,输出失败的原因等信息和失败后的处理操作。

  • 消费者消息确认可以设置手动和自动确认,自动确认因消费者监听方法内出现问题或者其他原因会导致消息丢失,所以一般是手动确认,在正常处理后确认签收,在出现异常时捕获异常,处理异常并拒签。

  • 整个过程总而言之,即从生产者发送消息到指定交换机,交换机再根据生产者给的路由键发送到指定队列,队列再发送到消费者。

在RabbitMQ的高级属性中,有消费端限流、公平与不公平分发、消息存活时间、优先级队列、死信队列、延迟队列。

  • 消费端限流就是说从队列到消费者的消息,消费者最多只能存在x条未处理的消息。

  • 公平分发就是每个消费者都得到平均分发的消息,一般不设置限流,不公平分发则是处理快的继续处理,没有数额限制,但是需要设置限流且值为1。

  • 消息存活时间可以设置队列消息的存活时间时需要在配置类在创建队列时指定,也可以设置单条消息的存活时间,但是需要在发送消息时创建并添加MessageProperties对象设置存活时间作为参数,俩这都是存活时间一过,消息等到队列顶端后移除。

  • 优先级队列,即需要在创建队列时指定队列的最大优先级,然后在发送消息时创建并添加MessageProperties对象设置优先级数作为参数。

  • 死信队列,即分别创建俩个交换机和俩个队列并将队列绑定到交换机,一个作为死信队列和交换机,一个作为普通队列和交换机,但是普通队列创建时需要绑定死信交换机和死信队列的路由键,并且当普通队列消息过期、拒收并且不重回普通队列、超过普通队列设置的最大消息长度时,将消息发送到死信交换机,死信交换机再发到死信队列,死信队列有专门的消费者进行监听。

  • 延迟队列有2种实现,一种是通过死信队列,设置普通队列的存活时间为延迟的时长,通过死信消费者处理延迟的消息;第二种是通过rabbitmq的延迟队列插件实现延迟队列。

RabbitMQ搭建集群及其实现镜像队列、负载均衡

  • 搭建集群就是多个独立的服务器在一个系统中组成一个群体;用以提高系统的可用性、处理能力、响应能力,防止宕机出现数据丢失,和解决服务器无法承受高并发的等问题。

  • 镜像队列就是复制主节点的数据到其他的从节点中,防止因为主节点宕机导致数据丢失。

  • 负载均衡就是将请求访问平均分发到集群的每个节点中,从而提高系统的性能,不会导致某个节点因为访问量过大从而出现系统性能降低或者该节点宕机。


Springboot整合rabbitmq
https://cai-qichang.github.io/2023/06/23/Springboot整合rabbitmq/
作者
caiqichang
发布于
2023年6月23日
许可协议
BY-蔡奇倡