消息队列

1、概述

介绍

RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用。

image-20221206194448869

消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。

消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。

如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。

特点

RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:

  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式、返回模式
  • 与Spring AMQP完美整合,API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

应用场景

  • 异步处理:把消息放入消息中间件中,等到需要的时候再去处理。
  • 流量削峰:例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
  • 日志处理
  • 应用解耦:使用MQ发布订阅模式

2、AMQP协议

AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

概念

  • Server:接收客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Server的网络连接,TCP连接
  • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成,Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”
  • Queue:消息队列,用来保存消息,供消费者消费

3、常用交换器

RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种

Direct Exchange

直连型交换机,该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中,一一对应

image-20221206195135620

Topic Exchange

将所有发送到 Topic Exchange 的消息被转发到所有 RoutingKey 中指定的 Topic 的队列上面

Exchange 将 RoutingKey 和某 Topic 进行模糊匹配,其中 * 用来匹配一个词,# 用于匹配一个或者多个词

例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而”com.*”只能匹配到“com.rabbitmq”

image-20221206195339946

Fanout Exchange

该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好

image-20221206195426231

Headers Exchange

该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用

4、rabbitmq 安装

建议通过 docker 镜像进行安装,更方便省事

# 安装镜像
docker pull rabbitmq

# 运行 rabbitmq 
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

# 查看容器 id
docker ps

# 进入容器中
docker exec -it <容器id> /bin/bash

# 在容器中运行
rabbitmq-plugins enable rabbitmq_management

安装完成之后访问 15672 端口即可进入登录界面,默认账号密码均为 guest

image-20221206195944778

新增用户

image-20221206210813535

点击用户

image-20221206210833292

进行授权

image-20221206210931534

5、在 sprinboot 中使用

实例代码地址:https://gitee.com/lxjblog/rabbitmq-study.git

创建 springboot 项目,导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

修改配置文件

spring:
  rabbitmq:
      # 需要在 rabbitmq 上新建用户
    username: root
    password: 123456
    host: <ip>
    port: 5672
  application:
    name: rabbitMq
server:
  port: 8080

Direct 用法

定义直连交换机

新建 config 目录,并创建 DirectExchange 配置类

@Configuration
public class MyDirectExchange { 
    /**
     * 定义队列
     */
    @Bean
    public Queue TestDirectQueue() { 
        // true 表示设置队列消息持久化
        return new Queue("TestDirectQueue", true);
    } 

    @Bean
    public Queue Test2DirectQueue() { 
        // true 表示设置队列消息持久化
        return new Queue("Test2DirectQueue", true);
    } 

    /**
     * 定义交换机
     */
    @Bean
    public DirectExchange TestDirectExchange() { 
        return new DirectExchange("TestDirectExchange",true,false);
    } 

    /**
     * 通过 routingKey 绑定交换机和队列
     */
    @Bean
    Binding bindingDirect() { 
        // TestDirectRouting 作为routingKey
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    } 
} 

创建 controller 目录,新增 RabbitMQController 用于发送队列消息

@RestController
public class RabbitMQController { 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("send")
    public String send() { 
        // 指定交换机和路由,并发送消息
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", "hello rabbitMq");
        return "ok!";
    } 
} 

创建监听类

receiver 目录下创建 DirectReceiver

@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver { 
    @RabbitHandler
    public void process(String msg) { 
        System.out.println("DirectReceiver消费者收到消息:" + msg);
    } 
} 

启动项目,访问 http://localhost:8080/send

image-20221206211517907

成功输出日志

image-20221206211529764

消息确认机制

消息接收的确认机制主要存在三种模式

  1. 自动确认, 这也是默认的消息确认情况。 RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递

  2. 根据情况确认

  3. 手动确认,这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。

    • basic.ack用于肯定确认

    • basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)

    • basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息

手动 ACK

1、将直连交换器的监听去掉

image-20221206213718188

2、在 receiver 目录下创建新的监听处理类 DirectAckReceiver

@Component
public class DirectAckReceiver { 
    public void handle(byte[] data) { 
        System.out.println(new String(data));
    } 
} 

3、在 receiver 目录下创建 ack 监听类 MyAckReceiver

@Component
public class MyAckReceiver implements ChannelAwareMessageListener { 
    @Autowired
    DirectAckReceiver directAckReceiver;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception { 
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try { 
            String queue = message.getMessageProperties().getConsumerQueue();
            // 获取数据
            byte[] body = message.getBody();
            // 判断队列内容
            if ("TestDirectQueue".equals(queue)) { 
                directAckReceiver.handle(body);
            } 
            // 消息确认
            channel.basicAck(deliveryTag, true);
        }  catch (Exception e) { 
            // 重新放回队列,容易出现重复消费
            channel.basicReject(deliveryTag, true);
            e.printStackTrace();
        } 
    } 
} 

4、在 config 目录下创建消息监听配置类 MessageListenerConfig

@Configuration
public class MessageListenerConfig { 

    @Autowired
    private CachingConnectionFactory connectionFactory;
    /**
     * 消息接收处理类
     */
    @Autowired
    private MyAckReceiver myAckReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() { 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置队列,多个队列可用逗号分开
        container.setQueueNames("TestDirectQueue");
        // 交给对应的监听者处理
        container.setMessageListener(myAckReceiver);
        return container;
    } 
} 

配置完成之后重启项目,测试能够再次监听到消息

image-20221206214134038

添加错误代码进行测试

image-20221206214623027

再次运行会看到程序报错,并对消息进行多次消费

image-20221206214706942

而且消息还存在队列中,未被成功消费

image-20221206214736519

使用设计模式

以上使用的手动 ack 方法的代码耦合性较高,新增一个队列的话都需要修改 MessageListenerConfigMyAckReceiver 这两个类的代码,不便于扩展,所以可以引用上 工厂设计模式策略设计模式

1、创建 factory 目录,新增 ReceiverFactory

public class ReceiverFactory { 

    public static Map<String, Receiver> strategyMap = new HashMap<>();

    public static Receiver getInvokeStrategy(String key) { 
        return strategyMap.get(key);
    } 

    public static void register(String key, Receiver receiver) { 
        if (StringUtils.isEmpty(key) || receiver == null) { 
            return;
        } 
        strategyMap.put(key, receiver);
    } 

    public static Set<String> getKeys() { 
        return strategyMap.keySet();
    } 
} 

2、在 receiver 目录下创建 Receiver 接口,这是策略模式中的面向接口,并继承 InitializingBean 会在项目启动的时候初始化

public interface Receiver extends InitializingBean { 
    void handle(byte[] data);
} 

3、修改 DirectAckReceiver 监听处理类,实现 Receiver 接口,并重写初始化方法

@Component
public class DirectAckReceiver implements Receiver { 
    @Override
    public void handle(byte[] data) { 
        System.out.println(new String(data));
    } 

    @Override
    public void afterPropertiesSet() throws Exception { 
        // 注册到工厂
        ReceiverFactory.register("TestDirectQueue", this);
    } 
} 

4、修改 MessageListenerConfig 消息监听配置类,修改队列的获取方式

@Configuration
public class MessageListenerConfig { 

    @Autowired
    private CachingConnectionFactory connectionFactory;
    /**
     * 消息接收处理类
     */
    @Autowired
    private MyAckReceiver myAckReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() { 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置队列
        Set<String> keys = ReceiverFactory.getKeys();
        container.setQueueNames(keys.toArray(new String[keys.size()]));
        // 指定监听类
        container.setMessageListener(myAckReceiver);
        return container;
    } 
} 

5、修改消息监听类 MyAckReceiver,通过工厂去获取对应的处理类

@Component
public class MyAckReceiver implements ChannelAwareMessageListener { 
    @Autowired
    DirectAckReceiver directAckReceiver;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception { 
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try { 
            String queue = message.getMessageProperties().getConsumerQueue();
            // 获取数据
            byte[] body = message.getBody();
            // 通过工厂获取对应的处理类
            Receiver strategy = ReceiverFactory.getInvokeStrategy(queue);
            strategy.handle(body);
            // 消息确认
            channel.basicAck(deliveryTag, true);
        }  catch (Exception e) { 
            // 重新放回队列,容易出现重复消费
            channel.basicReject(deliveryTag, true);
            e.printStackTrace();
        } 
    } 
} 

重启项目测试,能够正常运行

image-20221206220428842

新增其他队列

1、新增队列2

image-20221206221504431

2、通过 key2 将新队列绑定到交换机上

image-20221206221543203

3、新增队列2的处理类

image-20221206221619629

4、在 controller 中向队列2发送消息测试

image-20221206221703815

5、访问 http://localhost:8080/send2,查看输出

image-20221206221735527

自此,就完成了基本的直连型交换机的手动 ACK 代码设计,其他类型的交换器等需要用到的时候在进行测试