消息队列
1、概述
介绍
RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用。

消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。
消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。
如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
特点
RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:
- 开源、性能优秀,稳定性保障
- 提供可靠性消息投递模式、返回模式
- 与Spring AMQP完美整合,API丰富
- 集群模式丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性、可用性
应用场景
- 异步处理:把消息放入消息中间件中,等到需要的时候再去处理。
- 流量削峰:例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
- 日志处理
- 应用解耦:使用MQ发布订阅模式
2、AMQP协议
AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
概念
- Server:接收客户端的连接,实现AMQP实体服务
- Connection:连接,应用程序与Server的网络连接,TCP连接
- Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务
- Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成,Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue
- Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”
- Queue:消息队列,用来保存消息,供消费者消费
3、常用交换器
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种
Direct Exchange
直连型交换机,该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中,一一对应

Topic Exchange
将所有发送到 Topic Exchange 的消息被转发到所有 RoutingKey 中指定的 Topic 的队列上面
Exchange 将 RoutingKey 和某 Topic 进行模糊匹配,其中 * 用来匹配一个词,# 用于匹配一个或者多个词
例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而”com.*”只能匹配到“com.rabbitmq”

Fanout Exchange
该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好

Headers Exchange
该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用
4、rabbitmq 安装
建议通过 docker 镜像进行安装,更方便省事
# 安装镜像
docker pull rabbitmq
# 运行 rabbitmq
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
# 查看容器 id
docker ps
# 进入容器中
docker exec -it <容器id> /bin/bash
# 在容器中运行
rabbitmq-plugins enable rabbitmq_management
安装完成之后访问 15672 端口即可进入登录界面,默认账号密码均为 guest

新增用户

点击用户

进行授权

5、在 sprinboot 中使用
实例代码地址:https://gitee.com/lxjblog/rabbitmq-study.git
创建 springboot 项目,导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改配置文件
spring:
rabbitmq:
# 需要在 rabbitmq 上新建用户
username: root
password: 123456
host: <ip>
port: 5672
application:
name: rabbitMq
server:
port: 8080
Direct 用法
定义直连交换机
新建 config 目录,并创建 DirectExchange 配置类
@Configuration
public class MyDirectExchange {
/**
* 定义队列
*/
@Bean
public Queue TestDirectQueue() {
// true 表示设置队列消息持久化
return new Queue("TestDirectQueue", true);
}
@Bean
public Queue Test2DirectQueue() {
// true 表示设置队列消息持久化
return new Queue("Test2DirectQueue", true);
}
/**
* 定义交换机
*/
@Bean
public DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange",true,false);
}
/**
* 通过 routingKey 绑定交换机和队列
*/
@Bean
Binding bindingDirect() {
// TestDirectRouting 作为routingKey
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
创建 controller 目录,新增 RabbitMQController 用于发送队列消息
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public String send() {
// 指定交换机和路由,并发送消息
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", "hello rabbitMq");
return "ok!";
}
}
创建监听类
在 receiver 目录下创建 DirectReceiver
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("DirectReceiver消费者收到消息:" + msg);
}
}
启动项目,访问 http://localhost:8080/send

成功输出日志

消息确认机制
消息接收的确认机制主要存在三种模式
自动确认, 这也是默认的消息确认情况。 RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递
根据情况确认
手动确认,这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
手动 ACK
1、将直连交换器的监听去掉

2、在 receiver 目录下创建新的监听处理类 DirectAckReceiver
@Component
public class DirectAckReceiver {
public void handle(byte[] data) {
System.out.println(new String(data));
}
}
3、在 receiver 目录下创建 ack 监听类 MyAckReceiver
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Autowired
DirectAckReceiver directAckReceiver;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String queue = message.getMessageProperties().getConsumerQueue();
// 获取数据
byte[] body = message.getBody();
// 判断队列内容
if ("TestDirectQueue".equals(queue)) {
directAckReceiver.handle(body);
}
// 消息确认
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 重新放回队列,容易出现重复消费
channel.basicReject(deliveryTag, true);
e.printStackTrace();
}
}
}
4、在 config 目录下创建消息监听配置类 MessageListenerConfig
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 消息接收处理类
*/
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置队列,多个队列可用逗号分开
container.setQueueNames("TestDirectQueue");
// 交给对应的监听者处理
container.setMessageListener(myAckReceiver);
return container;
}
}
配置完成之后重启项目,测试能够再次监听到消息

添加错误代码进行测试

再次运行会看到程序报错,并对消息进行多次消费

而且消息还存在队列中,未被成功消费

使用设计模式
以上使用的手动 ack 方法的代码耦合性较高,新增一个队列的话都需要修改 MessageListenerConfig 和 MyAckReceiver 这两个类的代码,不便于扩展,所以可以引用上 工厂设计模式 和 策略设计模式
1、创建 factory 目录,新增 ReceiverFactory
public class ReceiverFactory {
public static Map<String, Receiver> strategyMap = new HashMap<>();
public static Receiver getInvokeStrategy(String key) {
return strategyMap.get(key);
}
public static void register(String key, Receiver receiver) {
if (StringUtils.isEmpty(key) || receiver == null) {
return;
}
strategyMap.put(key, receiver);
}
public static Set<String> getKeys() {
return strategyMap.keySet();
}
}
2、在 receiver 目录下创建 Receiver 接口,这是策略模式中的面向接口,并继承 InitializingBean 会在项目启动的时候初始化
public interface Receiver extends InitializingBean {
void handle(byte[] data);
}
3、修改 DirectAckReceiver 监听处理类,实现 Receiver 接口,并重写初始化方法
@Component
public class DirectAckReceiver implements Receiver {
@Override
public void handle(byte[] data) {
System.out.println(new String(data));
}
@Override
public void afterPropertiesSet() throws Exception {
// 注册到工厂
ReceiverFactory.register("TestDirectQueue", this);
}
}
4、修改 MessageListenerConfig 消息监听配置类,修改队列的获取方式
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 消息接收处理类
*/
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置队列
Set<String> keys = ReceiverFactory.getKeys();
container.setQueueNames(keys.toArray(new String[keys.size()]));
// 指定监听类
container.setMessageListener(myAckReceiver);
return container;
}
}
5、修改消息监听类 MyAckReceiver,通过工厂去获取对应的处理类
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Autowired
DirectAckReceiver directAckReceiver;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String queue = message.getMessageProperties().getConsumerQueue();
// 获取数据
byte[] body = message.getBody();
// 通过工厂获取对应的处理类
Receiver strategy = ReceiverFactory.getInvokeStrategy(queue);
strategy.handle(body);
// 消息确认
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 重新放回队列,容易出现重复消费
channel.basicReject(deliveryTag, true);
e.printStackTrace();
}
}
}
重启项目测试,能够正常运行

新增其他队列
1、新增队列2

2、通过 key2 将新队列绑定到交换机上

3、新增队列2的处理类

4、在 controller 中向队列2发送消息测试

5、访问 http://localhost:8080/send2,查看输出

自此,就完成了基本的直连型交换机的手动 ACK 代码设计,其他类型的交换器等需要用到的时候在进行测试
- 本文链接:https://lxjblog.gitee.io/2022/12/06/RabbitMQ%E5%85%A5%E9%97%A8/
- 版权声明:本博客所有文章除特别声明外,均默认采用 许可协议。