安装
介绍
MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。RocketMQ是⼀款阿⾥巴巴开源的消息中间件,主要用于限流,异步解耦操作,如付款之后短信通知,订单发货通知等等,都是异步进行执行。
查找rockermq镜像
拉取镜像
1
| docker pull rocketmqinc/rocketmq
|
创建目录
1
| mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
|
构建namesvr容器
1 2 3
| #构建namesrv容器 docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876 -v /docker/rocketmq/nameserver/logs:/root/logs -v /docker/rocketmq/nameserver/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
|
编写border.conf
1
| vi /rocketmq/conf/broker.conf
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| # 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的主从关系 brokerName = broker-a #0表示Master,大于0表示不同的slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH # 设置broker节点所在服务器的ip地址,也就是centosOS7的服务ip brokerIP1 = 【你的ip地址】 # 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full diskMaxUsedSpaceRatio=95
|
启动broker
1 2
| docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/docker/rocketmq/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/docker/rocketmq/broker.conf
|
安装控制台
1 2
| docker pull pangliang/rocketmq-console-ng
|
控制台
1 2
| docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.55.180:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng
|
核心的概念
消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。
Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位
为消息设置的标签,用于同一主题下区分不同类型的消息。
消息生产者,负责一般由业务系统。
消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务
处理。
项目构建
目录结构
依赖引入
1 2 3 4 5 6 7
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
|
生产者
yml配置
1 2 3 4 5 6 7 8 9 10 11 12
| server: port: 8022
spring: application: name: RocketMqOrderProvider
rocketmq: name-server: 192.168.55.180:9876 producer: group: order-producer-group
|
bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.example.demo.bean;
import lombok.Data;
import java.math.BigDecimal; import java.time.LocalDateTime;
@Datapublic class Order {
private Integer orderId; private Integer orderNo; private Integer productId; private Integer userId; private Integer orderNum; private BigDecimal orderAmt; private String orderStatus; private String payStatus; private String createUser; private LocalDateTime createTime;
}
|
模板发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.example.demo.controller;
import com.alibaba.fastjson.JSON; import com.example.demo.bean.Order; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal; import java.time.LocalDateTime;
@RestControllerpublic class MqOrderController {
@Autowired private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/sendOrderMessage") public String sendOrderMessage() { Order order = new Order(); order.setOrderId(1); order.setOrderNo(1); order.setProductId(1); order.setUserId(1); order.setOrderNum(5); order.setOrderAmt(new BigDecimal("100.0")); order.setOrderStatus("下单"); order.setPayStatus("未支付"); order.setCreateUser("cqc"); order.setCreateTime(LocalDateTime.now()); rocketMQTemplate.syncSend("order-topic", JSON.toJSONString(order),6000); return JSON.toJSONString(order); } }
|
消息接收者
yml配置
1 2 3 4 5 6 7 8 9 10 11
| server: port: 8023
spring: application: name: RocketMqOrderConsumer rocketmq: name-server: 192.168.5.130:9876 producer: group: order-consumer-group
|
消息监听
1 2 3 4 5 6 7 8 9 10 11
| @Component @RocketMQMessageListener(topic = "order-topic",consumerGroup = "my-consumer-group") @Slf4j public class ConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String orderInfo) { System.out.println(orderInfo); } }
|