Springboot整合rocketmq

安装

介绍

MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。RocketMQ是⼀款阿⾥巴巴开源的消息中间件,主要用于限流,异步解耦操作,如付款之后短信通知,订单发货通知等等,都是异步进行执行。

rocketmq搭建

查找rockermq镜像

1
docker search rocketmq

image-20230821201808657

拉取镜像

1
docker pull rocketmqinc/rocketmq

image-20230821201856751

创建目录

1
mkdir -p  /docker/rocketmq/data/namesrv/logs   /docker/rocketmq/data/namesrv/store

构建namesvr容器

1
2
3
#构建namesrv容器
docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876 -v /docker/rocketmq/nameserver/logs:/root/logs -v /docker/rocketmq/nameserver/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv

编写border.conf

1
vi /rocketmq/conf/broker.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址,也就是centosOS7的服务ip
brokerIP1 = 【你的ip地址】
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
diskMaxUsedSpaceRatio=95

启动broker

1
2
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/docker/rocketmq/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/docker/rocketmq/broker.conf

安装控制台

1
2
docker pull pangliang/rocketmq-console-ng

控制台

1
2
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.55.180:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng

核心的概念

  • 消息(Message)

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

  • 队列(Queue)

存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。

  • 主题(Topic)

Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位

  • 标签(Tag)

为消息设置的标签,用于同一主题下区分不同类型的消息。

  • Producer

消息生产者,负责一般由业务系统。

  • Consumer

消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务

处理。

项目构建

目录结构

image-20230821202101437

依赖引入

1
2
3
4
5
6
7
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

生产者

yml配置

1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 8022
#应用名字
spring:
application:
name: RocketMqOrderProvider
#rocketmq配置
rocketmq:
name-server: 192.168.55.180:9876
producer:
group: order-producer-group

bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.example.demo.bean;

import lombok.Data;

import java.math.BigDecimal;
import java.time.LocalDateTime;

@Datapublic class Order {

private Integer orderId;
private Integer orderNo;
private Integer productId;
private Integer userId;
private Integer orderNum;
private BigDecimal orderAmt;
private String orderStatus;
private String payStatus;
private String createUser;
private LocalDateTime createTime;

}

模板发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.example.demo.controller;

import com.alibaba.fastjson.JSON;
import com.example.demo.bean.Order;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;
import java.time.LocalDateTime;

@RestControllerpublic class MqOrderController {

@Autowired private RocketMQTemplate rocketMQTemplate;

@RequestMapping("/sendOrderMessage")
public String sendOrderMessage() {
Order order = new Order();
order.setOrderId(1);
order.setOrderNo(1);
order.setProductId(1);
order.setUserId(1);
order.setOrderNum(5);
order.setOrderAmt(new BigDecimal("100.0"));
order.setOrderStatus("下单");
order.setPayStatus("未支付");
order.setCreateUser("cqc");
order.setCreateTime(LocalDateTime.now());
rocketMQTemplate.syncSend("order-topic", JSON.toJSONString(order),6000);
return JSON.toJSONString(order);
}
}


消息接收者

yml配置

1
2
3
4
5
6
7
8
9
10
11
server:
port: 8023
#应用名字
spring:
application:
name: RocketMqOrderConsumer
rocketmq:
name-server: 192.168.5.130:9876
producer:
group: order-consumer-group

消息监听

1
2
3
4
5
6
7
8
9
10
11
@Component
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "my-consumer-group")
@Slf4j
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String orderInfo) {
System.out.println(orderInfo);
}
}



Springboot整合rocketmq
https://cai-qichang.github.io/2022/02/21/Springboot整合rocketmq/
作者
caiqichang
发布于
2022年2月21日
许可协议
BY-蔡奇倡