1 介绍

1.1 [1]定义:3类

01.分类1
    Broker: 简单来说就是消息队列服务器实体
    Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
    Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
    Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
    Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
    VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
    Producer: 消息生产者,就是投递消息的程序
    Consumer: 消息消费者,就是接受消息的程序
    Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

02.分类2
    Producer: 消息生产者,就是投递消息的程序
    Consumer: 消息消费者,就是接受消息的程序
    Channel: 我们的客户端连接都会使用一个Channel,再通过Channel去访问到RabbitMQ服务器,注意通信协议不是http,而是amqp协议
    Exchange:类似于交换机一样的存在,会根据我们的请求,转发给相应的消息队列,每个队列都可以绑定到Exchange上,这样Exchange就可以将数据转发给队列了,可以存在很多个,不同的Exchange类型可以用于实现不同消息的模式
    Queue:消息队列本体,生产者所有的消息都存放在消息队列中,等待消费者取出
    VirtualHost:有点类似于环境隔离,不同环境都可以单独配置一个VirtualHost,每个VirtualHost可以包含很多个Exchange和Queue,每个VirtualHost相互之间不影响

03.分类3
    生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上
    交换机(Exchange):和生产者建立连接并接收生产者的消息
    消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息
    队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互
    路由(Routes):交换机转发消息到队列的规则

1.2 [1]概念:13个

01.分类1
    a.消息队列(Message Queue)
        定义:一种用于异步传递数据的通信机制,允许应用程序通过队列交换消息,解耦生产者和消费者
        作用:提高系统的可靠性和伸缩性
    b.生产者(Producer)
        定义:发送消息到 RabbitMQ 的应用程序或服务
        作用:生成并发送消息到消息队列
    c.消费者(Consumer)
        定义:接收和处理来自 RabbitMQ 消息队列的消息的应用程序或服务
        作用:处理队列中的消息,执行任务

02.分类2
    a.交换机(Exchange)
        定义:负责接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列
        类型:
        Direct:精确匹配路由键的消息。消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式
        Fanout:将消息广播到所有绑定的队列。把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的
        Topic:根据通配符模式路由消息。通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
        Headers:根据消息头属性路由消息。不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
    b.队列(Queue)
        定义:存储消息的容器,消费者从队列中获取消息进行处理
        作用:确保消息的顺序性和持久性
    c.路由键(Routing Key)
        定义:用于将消息路由到特定队列的标识符
        作用:在交换机和队列之间进行消息路由
    d.绑定(Binding)
        定义:交换机和队列之间的连接关系,定义了如何将消息从交换机路由到队列
        作用:配置消息的路由规则

03.分类3
    a.消息确认(Acknowledgement)
        定义:消费者处理消息后发送的确认信号,通知 RabbitMQ 消息已被成功处理
        作用:确保消息不会丢失
    b.持久化(Persistence)
        定义:确保消息在 RabbitMQ 重启后仍然存在的机制
        作用:通过设置消息和队列的持久化选项,防止数据丢失
    c.死信队列(Dead-Letter Queue, DLQ)
        定义:用于存储无法成功处理的消息的队列
        作用:处理和分析错误消息,以便于排查和解决问题
    d.消费者确认(Consumer Acknowledgement)
        定义:确认消费者成功处理了消息后发送的信号
        作用:确保消息被处理后才从队列中删除

04.分类4
    a.集群(Cluster)
        定义:多个 RabbitMQ 节点组成的集群,共享消息队列数据,提高可用性和伸缩性
        作用:提供高可用性和负载均衡
    b.镜像队列(Mirrored Queue)
        定义:将消息队列的副本分布到多个节点上,以增强数据的持久性和高可用性
        作用:防止数据丢失,确保队列的高可用性

1.3 [1]特点:9个

01.特点
    1.可靠性
    2.灵活的路由
    3.消息集群
    4.高可用
    5.多种协议
    6.多语言客户端
    7.管理界面
    8.跟踪机制
    9.插件机制

1.4 [2]cluster

01.两种模式
    a.普通集群
        普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
        此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
        当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
        这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
    b.镜像集群
        它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。
        也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,
        这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
    c.节点类型
        RabbitMQ 中的节点类型有两种:
        RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快。
        Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息
        -----------------------------------------------------------------------------------------------------
        RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,
        必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,
        但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,
        或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。

02.搭建普通集群
    a.预备知识
        搭建集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie,
        我们在用 docker 创建 RabbitMQ 容器时,可以为之设置相应的 Cookie 值。
        -----------------------------------------------------------------------------------------------------
        RabbitMQ 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。
        可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。
        如果主机名 ping 不通,RabbitMQ 服务启动会失败
        如果我们是在不同的服务器上搭建 RabbitMQ 集群,我们将通过 Docker 的容器连接 link 来实现容器之间的访问
    b.开始搭建
        a.创建三个 RabbitMQ 容器:
            docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
            docker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
            docker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
            -------------------------------------------------------------------------------------------------
            三个节点现在就启动好了,注意在 mq02 和 mq03 中,分别使用了 --link 参数来实现容器连接,关于这个参数,
            另外还需要注意,mq03 容器中要既能够连接 mq01 也能够连接 mq02。
        b.接下来进入到 mq02 容器中,首先查看一下 hosts 文件,可以看到我们配置的容器连接已经生效了
            将来在 mq02 容器中,就可以通过 mylink01 或者 rabbit01 访问到 mq01 容器了。
        c.分别执行如下命令将 mq02 容器加入集群中:
            rabbitmqctl stop_app
            rabbitmqctl join_cluster rabbit@rabbit01
            rabbitmqctl start_app
        d.过相同的方式将 mq03 也加入到集群中
            rabbitmqctl stop_app
            rabbitmqctl join_cluster rabbit@rabbit01
            rabbitmqctl start_app
        e.输入如下命令我们可以查看集群的状态
            rabbitmqctl cluster_status
    c.代码测试
        a.applicaiton.properties
            spring.rabbitmq.addresses=localhost:5671,localhost:5672,localhost:5673
            spring.rabbitmq.username=guest
            spring.rabbitmq.password=guest
        b.简单的队列
            @Configuration
            public class RabbitConfig {
                public static final String MY_QUEUE_NAME = "my_queue_name";
                public static final String MY_EXCHANGE_NAME = "my_exchange_name";
                public static final String MY_ROUTING_KEY = "my_queue_name";

                @Bean
                Queue queue() {
                    return new Queue(MY_QUEUE_NAME, true, false, false);
                }

                @Bean
                DirectExchange directExchange() {
                    return new DirectExchange(MY_EXCHANGE_NAME, true, false);
                }

                @Bean
                Binding binding() {
                    return BindingBuilder.bind(queue())
                            .to(directExchange())
                            .with(MY_ROUTING_KEY);
                }
            }
        c.生产者
            @SpringBootTest
            class ProviderApplicationTests {

                @Autowired
                RabbitTemplate rabbitTemplate;

                @Test
                void contextLoads() {
                    rabbitTemplate.convertAndSend(null, RabbitConfig.MY_QUEUE_NAME, "hello 江南一点雨");
                }
            }
            -------------------------------------------------------------------------------------------------
            这条消息发送成功之后,在 RabbitMQ 的 Web 管理端,我们会看到三个 RabbitMQ 实例上都会显示有一条消息,
            但是实际上消息本身只存在于一个 RabbitMQ 实例。
        d.消费者
            @Component
            public class MsgReceiver {

                @RabbitListener(queues = RabbitConfig.MY_QUEUE_NAME)
                public void handleMsg(String msg) {
                    System.out.println("msg = " + msg);
                }
            }
            -------------------------------------------------------------------------------------------------
            当消息消费者启动成功后,这个方法中只收到一条消息,进一步验证了我们搭建的 RabbitMQ 集群是没问题的。

03.搭建镜像集群
    a.介绍
        所谓的镜像集群模式并不需要额外搭建,只需要我们将队列配置为镜像队列即可。
        这个配置可以通过网页配置,也可以通过命令行配置,我们分别来看。
    b.方式1:网页配置镜像队列
        点击 Admin 选项卡,然后点击右边的 Policies,再点击 Add/update a policy4
        -----------------------------------------------------------------------------------------------------
        接下来添加一个策略
        各参数含义如下:
        Name: policy 的名称。
        Pattern: queue 的匹配模式(正则表达式)。
        Definition:镜像定义,主要有三个参数:ha-mode, ha-params, ha-sync-mode。
        ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。
        ha-params:ha-mode 模式需要用到的参数。
        ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual。
        priority 为可选参数,表示 policy 的优先级。
        -----------------------------------------------------------------------------------------------------
        配置完成后,点击下面的 add/update policy 按钮,完成策略的添加,
        -----------------------------------------------------------------------------------------------------
        添加完成后,我们可以进行一个简单的测试。
        首先确认三个 RabbitMQ 都启动了,然后用上面的 provider 向消息队列发送一条消息。
        发完之后关闭 mq01 实例。
        接下来启动 consumer,此时发现 consumer 可以完成消息的消费(注意和前面的反向测试区分),这就说明镜像队列已经搭建成功了。
    c.方式2:命令行配置镜像队列
        rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
        rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

1.5 [2]restapi

01.开启REST API
    a.概念
        REST(Representational State Transfer)是一种 Web 软件架构风格,它是一种风格,而不是标准,匹配或兼容这种架构风格的的网络服务称为 REST 服务。
        REST 服务简洁并且有层次,它通常基于 HTTP、URI、XML 以及 HTML 这些现有的广泛流行的协议和标准。在 REST 中,资源是由 URI 来指定,对资源的增删改查操作可以通过 HTTP 协议提供的 GET、POST、PUT、DELETE 等方法实现。
    b.开启 Web 管理页面
        a.方式1:安装 RabbitMQ 的时候,直接选择 rabbitmq:3-management 镜像,安装命令如下:
            docker run -d --rm --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
            这样安装好的 RabbitMQ 就可以直接使用 Web 管理页面了。
        b.方式2:安装的时候就选择正常的普通镜像 rabbitmq:3,安装命令如下:
            docker run -d --hostname my-rabbit --name some-rabbit2 -p 5673:5672 -p 25672:15672 rabbitmq:3
            这个安装好之后,需要我们进入到容器中,然后手动开启 Web 管理插件,命令如下:
            ---------------------------------------------------------------------------------------------
            docker exec -it some-rabbit2 /bin/bash
            rabbitmq-plugins enable rabbitmq_management
            -------------------------------------------------------------------------------------------------
            第一条命令是进入到容器中,第二条命令开启 Web 管理插件

02.操作
    a.查看虚拟主机 myvh 下 hello-queue 队列的数据统计
        curl -i -u guest:guest http://localhost:15672/api/queues/myvh/hello-queue
    b.在 /myvh 虚拟主机下创建一个名为 javaboy-queue 的队列
        curl -i -u guest:guest -XPUT -H "Content-Type:application/json" -d '{"auto_delete":false,"durable":true}' http://localhost:15672/api/queues/myvh/javaboy-queue
    c.查看当前连接信息
        curl -i -u guest:guest http://localhost:15672/api/connections
    d.查看当前用户信息
        curl -i -u guest:guest http://localhost:15672/api/users
    e.建一个名为 zhangsan,密码是 123 ,角色是 administrator 的用户
        curl -i -u guest:guest -H "{Content-Type:application/json}" -d '{"password":"123","tags":"administrator"}' -XPUT http://localhost:15672/api/users/zhangsan
    f.将名为 zhangsan 的用户设置到名为 myvh 的 vhost 下
        curl -i -u guest:guest -H "{Content-Type:application/json}" -d '{"configure":".*","write":".*","read":".*"}' -XPUT http://localhost:15672/api/permissions/myvh/zhangsan

1.6 [2]rabbitmqadmin

01.rabbitmqadmin
    a.介绍
        我们自己平时做练习,一般都会开启 RabbitMQ 的 Web 管理页面,然而在生产环境下,经常是没有 Web 管理页面的,只能通过 CLI 命令去管理 MQ。
        其实呀,Web 管理页面虽然友好,但是很多时候没有 CLI 快捷,而且通过 CLI 命令行的操作,我们可以做更多的定制,例如将关键信息查出来后提供给集中的监控系统以触发报警。
        直接操作 CLI 命令行有点麻烦,RabbitMQ 提供了 CLI 管理工具 rabbitmqadmin ,其实就是基于 RabbitMQ 的 HTTP API,用 Python 写的一个脚本。因为 REST API 手动写请求还是挺麻烦的,这些脚本刚好替我们简化了这个操作,让这个事情变得更加简单了。
    b.安装
        如果我们创建 RabbitMQ 容器的时候使用的是 rabbitmq:3-management 镜像,那么默认情况下,rabbitmqadmin 就是安装好的。
        首先确认你的设备上安装了 Python,这是最基本的,因为 rabbitmqadmin 这个工具就是 Python 脚本。
        然后开启 RabbitMQ 的 Web 管理页面,然后输入如下地址(我的管理页面度那口映射为 25672):
        http://localhost:25672/cli/index.html
        -----------------------------------------------------------------------------------------------------
        在打开的页面中就可以看到 rabbitmqadmin 的下载链接。将 rabbitmqadmin 下载下来后,然后赋予其可执行权限即可:
        chmod +x rabbitmqadmin
        -----------------------------------------------------------------------------------------------------
        下载后的 rabbitmqadmin 我们可以直接用记事本打开,里边其实就是一堆 Python 脚本。
    c.总结
        这套流程操作下来还是挺麻烦的,所以,我建议大家直接使用 rabbitmq:3-management 镜像,一步到位。

02.功能
    列出 exchanges, queues, bindings, vhosts, users, permissions, connections and channels。
    创建和删除 exchanges, queues, bindings, vhosts, users and permissions。
    发布和获取消息,以及消息详情。
    关闭连接和清空队列。
    导入导出配置。

03.列出各种信息
    a.查看所有交换机
        python rabbitmqadmin list exchanges
    b.查看所有队列
        python rabbitmqadmin list queues
    c.查看所有 Binding
        python rabbitmqadmin list bindings
    d.查看所有虚拟主机
        python rabbitmqadmin list vhosts
    e.查看所有用户信息
        python rabbitmqadmin list users
    f.查看所有权限信息
        python rabbitmqadmin list permissions
    g.查看所有连接信息
        python rabbitmqadmin list connections
    h.查看所有通道信息
        python rabbitmqadmin list channels

04.一个完整的例子
    a.步骤1
        a.创建一个名为 javaboy-exchange 的交换机
            python rabbitmqadmin declare exchange name=javaboy-exchange durable=true auto_delete=false type=direct
            -------------------------------------------------------------------------------------------------
            declare exchange:这是创建一个交换机的命令。
            name=javaboy-exchange:指定交换机的名称为 javaboy-exchange。
            durable=true:表示这个交换机会在 RabbitMQ 服务器重启后仍然存在(持久化)。
            auto_delete=false:设置为 false 表示当没有队列绑定到该交换机时,交换机不会自动删除。
            type=direct:指定交换机的类型为 direct,即交换机会根据消息的 routing_key 精确匹配到绑定的队列。
        b.查看交换机
            python rabbitmqadmin list exchanges
    b.步骤2
        a.创建一个名为 javaboy-queue 的队列
            python rabbitmqadmin declare queue name=javaboy-queue durable=true auto_delete=false
            -------------------------------------------------------------------------------------------------
            declare queue:这是创建一个队列的命令。
            name=javaboy-queue:指定队列的名称为 javaboy-queue。
            durable=true:表示该队列会在 RabbitMQ 服务器重启后仍然存在(持久化队列)。
            auto_delete=false:设置为 false,表示当队列不再被使用时,不会自动删除
        b.查看队列
            python rabbitmqadmin list queues
    c.步骤3
        a.创建一个 Binding,将交换机和消息队列绑定起来
            python rabbitmqadmin declare binding source=javaboy-exchange destination=javaboy-queue routing_key=javaboy-routing
            -------------------------------------------------------------------------------------------------
            declare binding:这是为交换机和队列创建绑定关系的命令。
            source=javaboy-exchange:指定绑定的源头是交换机 javaboy-exchange。
            destination=javaboy-queue:指定绑定的目的地是队列 javaboy-queue。
            routing_key=javaboy-routing:指定绑定时的路由键为 javaboy-routing,表示只有携带这个路由键的消息会被交换机发送到这个队列中。
        b.查看绑定关系
            python rabbitmqadmin list bindings
    d.步骤4
        a.发布一条消息
            python rabbitmqadmin publish routing_key=javaboy-queue payload="hello javaboy"
            -------------------------------------------------------------------------------------------------
            publish:这是向队列或交换机发送消息的命令。
            routing_key=javaboy-queue:指定发送消息的路由键为 javaboy-queue,这意味着消息会根据绑定关系发送到与此路由键匹配的队列中。
            payload="hello javaboy":指定消息的内容为 "hello javaboy"。
        b.查看消息,不消费
            python rabbitmqadmin get queue=javaboy-queue
            -------------------------------------------------------------------------------------------------
            get:这是从队列中获取消息的命令。
            queue=javaboy-queue:指定从队列 javaboy-queue 中查看消息。
            默认情况下,get 命令不会消费消息(即不会从队列中移除消息),只会返回消息的内容以供查看。
        c.查看消息,并消费
            python rabbitmqadmin get queue=javaboy-queue requeue=false
        d.删除队列中的所有消息
            python rabbitmqadmin purge queue name=javaboy-queue
            -------------------------------------------------------------------------------------------------
            purge queue:这是清空队列中所有消息的命令。
            name=javaboy-queue:指定要清空的队列是 javaboy-queue。
    e.附说明
        a.查看 交换机、队列、绑定
            打开 RabbitMQ 管理界面,地址为 http://localhost:15672。
            登录后,点击左侧导航中的 Queues 或 Exchanges。
            在 Queues 页面,选择你感兴趣的队列,进入详情页后,可以看到与该队列绑定的所有交换机和路由键。
            在 Exchanges 页面,选择对应的交换机,同样可以查看其绑定的队列和路由键信息。
        b.查看 消费消息
            打开 RabbitMQ 管理界面 http://localhost:15672。
            登录后,点击左侧导航栏中的 Queues。
            找到 javaboy-queue 队列并点击它。
            在队列的详情页中,找到 Get messages 部分。
            选择你要消费的消息数量,点击 Get Message(s) 按钮。可以选择是否移除消息(即是否 ack 消息),如果选择 Auto-ack,消息将会被移除。

05.命令汇总
    python rabbitmqadmin list users                                                                             | 查看所有用户 User
    python rabbitmqadmin list users name                                                                        | 查看所有用户名 Username
    python rabbitmqadmin list users tags                                                                        | 查看所有用户角色
    python rabbitmqadmin list vhosts                                                                            | 查看所有虚拟主机
    python rabbitmqadmin list connections                                                                       | 查看所有连接
    python rabbitmqadmin list exchanges                                                                         | 查看所有路由 Exchange
    python rabbitmqadmin list bindings                                                                          | 查看所有路由与队列的关系绑定 Binding
    python rabbitmqadmin list permissions                                                                       | 查看所有角色的权限 Permission
    python rabbitmqadmin list channels                                                                          | 查看所有通道 Channel
    python rabbitmqadmin list consumers                                                                         | 查看所有消费者 Consumer
    python rabbitmqadmin list queues                                                                            | 查看所有消息队列 Queue
    python rabbitmqadmin list nodes                                                                             | 查看所有节点 Node
    python rabbitmqadmin show overview                                                                          | 概览 Overview
    python rabbitmqadmin list bindings source destination_type destination properties_key                       | 查看所有路由与队列的关系绑定的详细信息 Binding
    python rabbitmqadmin declare queue name=test durable=true                                                   | 定义一个队列queue,durable=true代表持久化打开。
    python rabbitmqadmin declare exchange name=my.fanout type=fanout                                            | 定义一个Fanout路由
    python rabbitmqadmin declare exchange name=my.direct type=direct                                            | 定义一个Direct路由
    python rabbitmqadmin declare exchange name=my.topic type=topic                                              | 定义一个Topic路由
    python rabbitmqadmin declare binding source=my.fanout destination=test routing_key=first                    | 定义 binding
    python rabbitmqadmin publish routing_key=test payload="hello world"                                         | 发布一条消息
    python rabbitmqadmin publish routing_key=my.test exchange=my.topic payload="hello world"                    | 使用路由转发消息
    python rabbitmqadmin get queue=test requeue=true                                                            | 查看消息,不消费
    python rabbitmqadmin get queue=test requeue=false                                                           | 查看消息,并消费
    python rabbitmqadmin purge queue name=test                                                                  | 删除队列中的所有消息
    python rabbitmqadmin delete queue name=hello                                                                | 删除消息队列 Queue
    python rabbitmqadmin delete user name=test                                                                  | 删除用户 User
    python rabbitmqadmin delete exchange name=test                                                              | 删除路由器 Exchange
    python rabbitmqadmin delete binding source='kk' destination_type=queue destination=test properties_key=test | 删除路由器与消息队列的关系绑定 Binding
    python rabbitmqadmin -f raw_json list users                                                                 | raw_json 格式化输出
    python rabbitmqadmin -f long list users                                                                     | 格式化输出
    python rabbitmqadmin -f pretty_json list users                                                              | pretty_json 格式化输出
    python rabbitmqadmin -f kvp list users                                                                      | 格式化输出
    python rabbitmqadmin -f tsv list users                                                                      | 格式化输出
    python rabbitmqadmin -f table list users                                                                    | table 格式化输出
    python rabbitmqadmin -f bash list users                                                                     | bash 格式化输出

1.7 [2]用户、权限、VirtualHost

01.用户管理
    a.说明
        因为 vhost 通常跟用户一起出现,所以这里我也顺便说下 user 的相关操作。
        添加一个用户名为 javaboy,密码为 123 的用户,方式如下:
        rabbitmqctl add_user javaboy 123
    b.示例
        通过如下命令可以修改用户密码(将 javaboy 的密码改为 123456):
        rabbitmqctl change_password javaboy 123456
        -----------------------------------------------------------------------------------------------------
        通过如下命令可以验证用户密码:
        rabbitmqctl authenticate_user javaboy 123456
        -----------------------------------------------------------------------------------------------------
        通过如下命令可以查看当前的所有用户:
        rabbitmqctl list_users
        -----------------------------------------------------------------------------------------------------
        给用户设置角色的命令如下(给 javaboy 设置 administrator 角色):
        rabbitmqctl set_user_tags javaboy administrator
        -----------------------------------------------------------------------------------------------------
        最后,删除一个用户的命令如下:
        rabbitmqctl delete_user javaboy

02.VirtualHost
    a.多租户
        我们安装一个 RabbitMQ 服务器,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
        本质上,每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突。
        我们并不需要特别的去看待 vhost,他就跟普通的物理 RabbitMQ 一样,不同的 vhost 能够提供逻辑上的分离,确保不同的应用消息队列能够安全独立运行。
        要我来说,我们该怎么看待 vhost 和 RabbitMQ 的关系呢?RabbitMQ 相当于一个 Excel 文件,而 vhost 则是 Excel 文件中的一个个 sheet,我们所有的操作都是在某一个 sheet 上进行操作。
    b.方式1:命令
        因为松哥这里的 RabbitMQ 是用 docker 安装的,所以我们首先进入到 docker 容器中:
        docker exec -it some-rabbit /bin/bash
        -----------------------------------------------------------------------------------------------------
        然后执行如下命令创建一个名为 /myvh 的 vhost:
        rabbitmqctl add_vhost myvh
        -----------------------------------------------------------------------------------------------------
        然后通过如下命令可以查看已有的 vhost:
        rabbitmqctl list_vhosts
        -----------------------------------------------------------------------------------------------------
        可以通过如下命令删除一个 vhost:
        rabbitmqctl delete_vhost myvh
        当删除一个 vhost 的时候,与这个 vhost 相关的消息队列、交换机以及绑定关系等,统统都会被删除。
        -----------------------------------------------------------------------------------------------------
        给一个用户设置 vhost:
        rabbitmqctl set_permissions -p myvh guest ".*" ".*" ".*"
        前面参数都好说,最后面三个 ".*" 含义分别如下:
        用户在所有资源上都拥有可配置权限(创建/删除消息队列、创建/删除交换机等)。
        用户在所有资源上都拥有写权限(发消息)。
        用户在所有资源上都拥有读权限(消息消费,清空队列等)。
        -----------------------------------------------------------------------------------------------------
        禁止一个用户访问某个 vhost:
        rabbitmqctl clear_permissions -p myvh guest
    c.方式2:Web页面
        在 admin 选项卡中,点击右边的 Virtual Hosts,
        然后点击下边的 Add a new virtual host ,可以添加一个新的 vhost
        进入到某一个 vhost 之后,可以修改其权限以及删除一个 vhost

03.权限系统
    a.概念
        不管我们是通过网页还是通过命令行工具创建用户对象,刚创建好的用户对象都是没法直接使用的,
        需要我们首先把这个用户置于某一个 vhost 之下,然后再赋予其权限,有了权限,这个用户才可以正常使用。
    b.概念
        在这套 ACL 风格的权限管理系统中,允许非常多细粒度的权限控制,可以为不同用户分别设置读、写以及配置等权限。
        这里涉及到三种不同的权限:
        读:和消息消费有关的所有操作,包括清除整个队列的消息。
        写:发布消息。
        配置:消息队列、交换机等的创建和删除。
    c.方式1:命令
        a.权限操作命令
            rabbitmqctl set_permissions [-p vhosts] {user} {conf} {write} {read}
            -------------------------------------------------------------------------------------------------
            这里有几个参数:
            [-p vhost]:授予用户访问权限的 vhost 名称,如果不写默认为 /。
            user:用户名。
            conf:用户在哪些资源上拥有可配置权限(支持正则表达式)。
            write:用户在哪些资源上拥有写权限(支持正则表达式)。
            read:用户在哪些资源上拥有读权限(支持正则表达式)。
        b.假设我们有一个名为 zhangsan 的用户,我们希望该用户在 myvh 虚拟主机下具备所有权限,那么我们的操作命令如下:
            rabbitmqctl set_permissions -p myvh zhangsan ".*" ".*" ".*"
            执行如下命令可以验证授权是否成功:
            rabbitmqctl -p myvh list_permissions
            -------------------------------------------------------------------------------------------------
            在上面的授权命令中,我们用的都是 ".*",松哥再额外说下这个通配符:
            ".*":这个表示匹配所有的交换机和队列。
            "javaboy-.*":这个表示匹配名字以 javaboy- 开头的交换机和队列。
            "":这个表示不匹配任何队列与交换机(如果想撤销用户的权限可以使用这个)。
        c.我们可以使用如下命令来移除某一个用户在某一个 vhost 上的权限,例如移除 zhangsan 在 myvh 上的所有权限
            rabbitmqctl clear_permissions -p myvh zhangsan
            执行完成后,我们可以通过 rabbitmqctl -p myvh list_permissions 命令来查看执行结果是否生效,最终执行效果如下:
            -------------------------------------------------------------------------------------------------
            如果一个用户在多个 vhost 上都有对应的权限,按照上面的 rabbitmqctl -p myvh list_permissions 命令
            只能查看一个 vhost 上的权限,此时我们可以通过如下命令来查看 lisi 在所有 vhost 上的权限:
            rabbitmqctl list_user_permissions lisi
    d.方式2:Web页面
        可以设置权限,也可以清除权限

1.8 [3]核心组件:5个

01.核心组件
    Producer(生产者): 发送消息到 RabbitMQ
    Exchange(交换机): 接收来自生产者的消息,并根据路由规则将其路由到一个或多个队列
    Queue(队列): 存储消息,直到被消费者消费
    Binding(绑定): 定义交换机与队列之间的路由规则
    Consumer(消费者): 从队列中接收并处理消息

1.9 [3]数据格式:3个

01.数据格式
    a.文本消息: JSON、XML、纯文本
        示例: {"user":"Alice","action":"login"}
    b.二进制数据: 图片、视频、文件
        示例: JPEG 图片文件的字节流
    c.序列化对象: 使用 Protocol Buffers、Thrift 等序列化的对象
        示例: Java 对象序列化后的字节数组

1.10 [3]投递过程:6步

01.投递过程
    1.客户端连接到消息队列服务器,打开一个 channel
    2.客户端声明一个 exchange,并设置相关属性
    3.客户端声明一个 queue,并设置相关属性
    4.客户端使用 routing key,在 exchange 和 queue 之间建立好绑定关系
    5.客户端投递消息到 exchange
    6.客户端从指定的 queue 中消费信息

1.11 [3]消息过期:4种

00.总结
    01.第1种:单条消息过期
        体现(生产者)
        // 设置消息的过期时间为10000毫秒(10秒)
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .expiration("10000") // 设置过期时间
            .build();
        channel.basicPublish("", "simple-queue1", props, message.getBytes());
    02.第2种:队列消息过期
        体现(生产者)
        // 设置消息过期时间为 10000 毫秒(10秒)
        channel.queueDeclare("simple-queue1", false, false, false, new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});
    03.第3种:特殊情况
        还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
        这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
        是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)
    04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
        消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
        消息过期
        队列达到最大长度
        -----------------------------------------------------------------------------------------------------
        步骤如下:
        1: 创建连接工厂,设置连接属性
        2: 从连接工厂中获取
        3: 从连接中打开通道channel
        4.1: 声明死信交换机、死信队列、绑定死信队列和死信交换机
        4.2: 声明普通交换机、声明普通队列,设置TTL和DLX、绑定普通队列和普通交换机
        5: 通过channel发送消息
        -----------------------------------------------------------------------------------------------------
        体现(生产者)
        // 4.1
        // 声明死信交换机
        channel.exchangeDeclare("dlx-exchange", "direct");
        // 声明死信队列
        channel.queueDeclare("dlx-queue", true, false, false, null);
        // 绑定死信队列和死信交换机
        channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

        // 4.2
        // 声明普通交换机
        // 使用默认 (AMQP Default)
        // 声明普通队列,设置TTL和DLX
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
        map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机
        map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key
        channel.queueDeclare("simple-queue3", true, false, false, map);
        // 绑定普通队列和普通交换机
        // 无需绑定

00.TTL
    a.概念
        默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,
        如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
        -----------------------------------------------------------------------------------------------------
        TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,
        那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信,
    b.TTL 的设置有两种不同的方式:
        方式1:在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。
        方式2:在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。
        -----------------------------------------------------------------------------------------------------
        那如果两个都设置了呢?
        以时间短的为准。
    c.当我们设置了消息有效期后,消息过期了就会被从队列中删除了,但是两种方式对应的删除时机有一些差异:
        a.第一种方式
            当消息队列设置过期时间的时候,那么消息过期了就会被删除,因为消息进入 RabbitMQ 后是存在一个消息队列中,
            队列的头部是最早要过期的消息,所以 RabbitMQ 只需要一个定时任务,从头部开始扫描是否有过期消息,
            有的话就直接删除。
        b.第二种方式
            当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,
            因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,
            当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除。

01.第1种:单条消息过期
    a.application.properties
        spring.rabbitmq.host=127.0.0.1
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.virtual-host=/
    b.消息队列
        @Configuration
        public class QueueConfig {

            public static final String JAVABOY_QUEUE_DEMO = "javaboy_queue_demo";
            public static final String JAVABOY_EXCHANGE_DEMO = "javaboy_exchange_demo";
            public static final String HELLO_ROUTING_KEY = "hello_routing_key";

            @Bean
            Queue queue() {
                return new Queue(JAVABOY_QUEUE_DEMO, true, false, false);
            }

            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(JAVABOY_EXCHANGE_DEMO, true, false);
            }

            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue())
                        .to(directExchange())
                        .with(HELLO_ROUTING_KEY);
            }
        }
        -------------------------------------------------------------------------------------------------
        这个配置类主要干了三件事:配置消息队列、配置交换机以及将两者绑定在一起。
        首先配置一个消息队列,new 一个 Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列。
        配置一个 DirectExchange 交换机。
        将交换机和队列绑定到一起。
    c.生产者
        @RestController
        public class HelloController {
            @Autowired
            RabbitTemplate rabbitTemplate;

            @GetMapping("/hello")
            public void hello() {
                Message message = MessageBuilder.withBody("hello javaboy".getBytes())
                        .setExpiration("10000")
                        .build();
                rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);
            }
        }
        -------------------------------------------------------------------------------------------------
        在创建 Message 对象的时候我们可以设置消息的过期时间,这里设置消息的过期时间为 10 秒。
    d.测试
        接下来我们启动项目,进行消息发送测试。当消息发送成功之后,由于没有消费者,所以这条消息并不会被消费。
        打开 RabbitMQ 管理页面,点击到 Queues 选项卡,10s 之后,我们会发现消息已经不见了:
        单条消息设置过期时间,就是在消息发送的时候设置一下消息有效期即可。

02.第2种:队列消息过期
    a.application.properties
        spring.rabbitmq.host=127.0.0.1
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.virtual-host=/
    b.给队列设置消息过期时间
        @Configuration
        public class QueueConfig {

            public static final String JAVABOY_QUEUE_DEMO = "javaboy_queue_demo";
            public static final String JAVABOY_EXCHANGE_DEMO = "javaboy_exchange_demo";
            public static final String HELLO_ROUTING_KEY = "hello_routing_key";

            @Bean
            Queue queue() {
                Map<String, Object> args = new HashMap<>();
                args.put("x-message-ttl", 10000);
                return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
            }

            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(JAVABOY_EXCHANGE_DEMO, true, false);
            }

            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue())
                        .to(directExchange())
                        .with(HELLO_ROUTING_KEY);
            }
        }
        -------------------------------------------------------------------------------------------------
        给队列设置消息过期时间
        @Bean
        Queue queue() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-message-ttl", 10000);
            return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
        }
    c.生产者
        @RestController
        public class HelloController {
            @Autowired
            RabbitTemplate rabbitTemplate;

            @GetMapping("/hello")
            public void hello() {
                Message message = MessageBuilder.withBody("hello javaboy".getBytes())
                        .build();
                rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);
            }
        }
    d.测试
        可以看到,消息正常发送即可,不用设置消息过期时间。
        OK,启动项目,发送一条消息进行测试。查看 RabbitMQ 管理页面,如下:
        可以看到,消息队列的 Features 属性为 D 和 TTL,D 表示消息队列中消息持久化,TTL 则表示消息会过期。
        10s 之后刷新页面,发现消息数量已经恢复为 0。
        这就是给消息队列设置消息过期时间,一旦设置了,所有进入到该队列的消息都有一个过期时间了。

03.第3种:特殊情况
    还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
    这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
    是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)

04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
    a.介绍
        a.概念
            有小伙伴不禁要问,被删除的消息去哪了?真的被删除了吗?非也非也!这就涉及到死信队列了,接下来我们来看看死信队列。
            -------------------------------------------------------------------------------------------------
            死信交换机,Dead-Letter-Exchange 即 DLX。
            死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?
        b.一般消息变成死信消息有如下几种情况:
            消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
            消息过期
            队列达到最大长度
        c.总结
            当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。
            DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,
            当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
        d.死信队列
            绑定了死信交换机的队列就是死信队列
    b.代码实现
        a.application.properties
            spring.rabbitmq.host=127.0.0.1
            spring.rabbitmq.port=5672
            spring.rabbitmq.username=guest
            spring.rabbitmq.password=guest
            spring.rabbitmq.virtual-host=/
        b.创建一个死信交换机,接着创建一个死信队列,再将死信交换机和死信队列绑定到一起。
            这其实跟普通的交换机,普通的消息队列没啥两样
            -------------------------------------------------------------------------------------------------
            public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
            public static final String DLX_QUEUE_NAME = "dlx_queue_name";
            public static final String DLX_ROUTING_KEY = "dlx_routing_key";

            /**
             * 配置死信交换机
             *
             * @return
             */
            @Bean
            DirectExchange dlxDirectExchange() {
                return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
            }
            /**
             * 配置死信队列
             * @return
             */
            @Bean
            Queue dlxQueue() {
                return new Queue(DLX_QUEUE_NAME);
            }
            /**
             * 绑定死信队列和死信交换机
             * @return
             */
            @Bean
            Binding dlxBinding() {
                return BindingBuilder.bind(dlxQueue())
                        .to(dlxDirectExchange())
                        .with(DLX_ROUTING_KEY);
            }
            -------------------------------------------------------------------------------------------------
            为消息队列配置死信交换机
            @Bean
            Queue queue() {
                Map<String, Object> args = new HashMap<>();
                //设置消息过期时间
                args.put("x-message-ttl", 0);
                //设置死信交换机
                args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
                //设置死信 routing_key
                args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
                return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
            }
            -------------------------------------------------------------------------------------------------
            就两个参数:
            x-dead-letter-exchange:配置死信交换机。
            x-dead-letter-routing-key:配置死信 routing_key。
            -------------------------------------------------------------------------------------------------
            将来发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,
            就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。
        c.消费者(死信消息队列的消费和普通消息队列的消费并无二致)
            @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
            public void dlxHandle(String msg) {
                System.out.println("dlx msg = " + msg);
            }

1.12 [3]延迟队列:2种

00.介绍
    a.场景
        定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,
        这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,
        向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,
    b.场景
        因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如:
        在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
        我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。
        公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
        安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
        用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
    c.在 RabbitMQ 上实现定时任务有两种方式:
        方式1:利用 RabbitMQ 自带的消息过期和死信队列机制,实现定时任务。(DLX)
        方式2:使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。

01.方式1:利用 RabbitMQ 自带的消息过期和死信队列机制,实现定时任务,即DLX 实现延迟队列
    a.介绍
        DLX 实现延迟队列
        延迟队列实现的思路也很简单,就是上篇文章我们所说的 DLX(死信交换机)+TTL(消息超时时间)
        -----------------------------------------------------------------------------------------------------
        我们可以把死信队列就当成延迟队列,也就是延迟队列的实现思路
        假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,
        同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,
        那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,
        此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。
    b.代码实现
        a.application.properties
            spring.rabbitmq.host=localhost
            spring.rabbitmq.username=guest
            spring.rabbitmq.password=guest
            spring.rabbitmq.port=5672
        b.配置两个消息队列:一个普通队列,一个死信队列
            @Configuration
            public class QueueConfig {
                public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
                public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
                public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key";
                public static final String DLX_QUEUE_NAME = "dlx_queue_name";
                public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
                public static final String DLX_ROUTING_KEY = "dlx_routing_key";

                /**
                 * 死信队列
                 * @return
                 */
                @Bean
                Queue dlxQueue() {
                    return new Queue(DLX_QUEUE_NAME, true, false, false);
                }

                /**
                 * 死信交换机
                 * @return
                 */
                @Bean
                DirectExchange dlxExchange() {
                    return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
                }

                /**
                 * 绑定死信队列和死信交换机
                 * @return
                 */
                @Bean
                Binding dlxBinding() {
                    return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                            .with(DLX_ROUTING_KEY);
                }

                /**
                 * 普通消息队列
                 * @return
                 */
                @Bean
                Queue javaboyQueue() {
                    Map<String, Object> args = new HashMap<>();
                    //设置消息过期时间
                    args.put("x-message-ttl", 1000*10);
                    //设置死信交换机
                    args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
                    //设置死信 routing_key
                    args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
                    return new Queue(JAVABOY_QUEUE_NAME, true, false, false, args);
                }

                /**
                 * 普通交换机
                 * @return
                 */
                @Bean
                DirectExchange javaboyExchange() {
                    return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
                }

                /**
                 * 绑定普通队列和与之对应的交换机
                 * @return
                 */
                @Bean
                Binding javaboyBinding() {
                    return BindingBuilder.bind(javaboyQueue())
                            .to(javaboyExchange())
                            .with(JAVABOY_ROUTING_KEY);
                }
            }
            -------------------------------------------------------------------------------------------------
            配置可以分为两组,第一组配置死信队列,第二组配置普通队列。每一组都由消息队列、消息交换机以及 Binding 三者组成。
            配置消息队列时,为消息队列指定死信队列,
            配置队列中的消息过期时间时,默认的时间单位时毫秒。
        c.为死信队列配置一个消费者
            @Component
            public class DlxConsumer {
                private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);

                @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
                public void handle(String msg) {
                    logger.info(msg);
                }
            }
        d.生产者
            @SpringBootTest
            class DelayQueueApplicationTests {

                @Autowired
                RabbitTemplate rabbitTemplate;

                @Test
                void contextLoads() {
                    System.out.println(new Date());
                    rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!");
                }

            }

02.方式2:使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。
    a.安装插件
        下载 rabbitmq_delayed_message_exchange 插件
        -----------------------------------------------------------------------------------------------------
        下载完成后在命令行执行如下命令将下载文件拷贝到 Docker 容器中去:
        docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez some-rabbit:/plugins
        这里第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置。
        -----------------------------------------------------------------------------------------------------
        再执行如下命令进入到 RabbitMQ 容器中:
        docker exec -it some-rabbit /bin/bash
        进入到容器之后,执行如下命令启用插件:
        rabbitmq-plugins enable rabbitmq_delayed_message_exchange
        启用成功之后,还可以通过如下命令查看所有安装的插件,看看是否有我们刚刚安装过的插件,如下:
        rabbitmq-plugins list
    b.代码实现
        a.application.properties
            spring.rabbitmq.host=localhost
            spring.rabbitmq.password=guest
            spring.rabbitmq.username=guest
            spring.rabbitmq.virtual-host=/
        b.配置类
            @Configuration
            public class RabbitConfig {
                public static final String QUEUE_NAME = "javaboy_delay_queue";
                public static final String EXCHANGE_NAME = "javaboy_delay_exchange";
                public static final String EXCHANGE_TYPE = "x-delayed-message";

                @Bean
                Queue queue() {
                    return new Queue(QUEUE_NAME, true, false, false);
                }

                @Bean
                CustomExchange customExchange() {
                    Map<String, Object> args = new HashMap<>();
                    args.put("x-delayed-type", "direct");
                    return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);
                }

                @Bean
                Binding binding() {
                    return BindingBuilder.bind(queue())
                            .to(customExchange()).with(QUEUE_NAME).noargs();
                }
            }
            -------------------------------------------------------------------------------------------------
            这里主要是交换机的定义有所不同,小伙伴们需要注意。
            这里我们使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:
                交换机名称。
                交换机类型,这个地方是固定的。
                交换机是否持久化。
                如果没有队列绑定到交换机,交换机是否删除。
                其他参数。
            最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,
            用了哪种类型,将来交换机分发消息就按哪种方式来。
        c.消费者
            @Component
            public class MsgReceiver {
                private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
                @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
                public void handleMsg(String msg) {
                    logger.info("handleMsg,{}",msg);
                }
            }
        d.生产者
            @SpringBootTest
            class MqDelayedMsgDemoApplicationTests {

                @Autowired
                RabbitTemplate rabbitTemplate;
                @Test
                void contextLoads() throws UnsupportedEncodingException {
                    Message msg = MessageBuilder.withBody(("hello 江南一点雨"+new Date()).getBytes("UTF-8")).setHeader("x-delay", 3000).build();
                    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg);
                }

            }
            -------------------------------------------------------------------------------------------------
            在消息头中设置消息的延迟时间。

2 消费:官方

2.1 [1]Hello World

00.介绍
    a.概念
        咦?这个咋没有交换机?
        这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者
    b.说明
        默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上
        当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上
        例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收

01.代码实现
    a.队列
        @Configuration
        public class HelloWorldConfig {

            public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";

            @Bean
            Queue queue1() {
                return new Queue(HELLO_WORLD_QUEUE_NAME);
            }
        }
    b.生产者
        @SpringBootTest
        class RabbitmqdemoApplicationTests {

            @Autowired
            RabbitTemplate rabbitTemplate;

            @Test
            void contextLoads() {
                rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
            }
        }
    c.消费者
        @Component
        public class HelloWorldConsumer {
            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
            public void receive(String msg) {
                System.out.println("msg = " + msg);
            }
        }

2.2 [2]Work queues

00.介绍
    a.概念
        一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者
    b.说明
        一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中
        消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息

01.代码实现
    a.队列
        @Configuration
        public class HelloWorldConfig {

            public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";

            @Bean
            Queue queue1() {
                return new Queue(HELLO_WORLD_QUEUE_NAME);
            }
        }
    b.生产者
        @SpringBootTest
        class RabbitmqdemoApplicationTests {

            @Autowired
            RabbitTemplate rabbitTemplate;

            @Test
            void contextLoads() {
                for (int i = 0; i < 10; i++) {
                    rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
                }
            }

        }
    c.消费者
        @Component
        public class HelloWorldConsumer {
            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
            public void receive(String msg) {
                System.out.println("receive = " + msg);
            }

            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
            public void receive2(String msg) {
                System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());
            }
        }
    d.说明1
        可以看到,第二个消费者我配置了 concurrency 为 10,此时
        对于第二个消费者,将会同时存在 10 个子线程去消费消息
        -----------------------------------------------------------------------------------------------------
        启动项目,在 RabbitMQ 后台也可以看到一共有 11 个消费者
        消息都被第一个消费者消费了。但是小伙伴们需要注意,事情并不总是这样(多试几次就可以看到差异)
        消息也有可能被第一个消费者消费(只是由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)
    e.说明2
        消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息
        配置手动 ack 的方式如下:
        spring.rabbitmq.listener.simple.acknowledge-mode=manual
        -----------------------------------------------------------------------------------------------------
        消费者代码如下:
        @Component
        public class HelloWorldConsumer {
            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
            public void receive(Message message,Channel channel) throws IOException {
                System.out.println("receive="+message.getPayload());
                channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
            }

            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")
            public void receive2(Message message, Channel channel) throws IOException {
                System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());
                channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
            }
        }
        -----------------------------------------------------------------------------------------------------
        此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息
        这就是 Work queues 这种情况

2.3 [3]Publish/Subscribe

00.介绍
    a.概念
        一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机
        每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的
        需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失
        这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力
    b.有四种交换机可供选择
        direct 直连交换机:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式
        fanout 扇形交换机:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的
        topic 主题交换机:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
        headers 头交换机:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了

01.direct:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式
    a.介绍
        DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,
        当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,
        例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
    b.DirectExchange交换机
        @Configuration
        public class RabbitDirectConfig {
            public final static String DIRECTNAME = "javaboy-direct";
            @Bean
            Queue queue() {
                return new Queue("hello-queue");
            }

            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(DIRECTNAME, true, false);
            }

            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
            }
        }
        -----------------------------------------------------------------------------------------------------
        首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除
        创建一个Binding对象将Exchange和Queue绑定在一起
        DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可
    c.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void directTest() {
                rabbitTemplate.convertAndSend("hello-queue", "hello direct!");
            }
        }
        -----------------------------------------------------------------------------------------------------
        在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送
    d.消费者
        @Component
        public class DirectReceiver {
            @RabbitListener(queues = "hello-queue")
            public void handler1(String msg) {
                System.out.println("DirectReceiver:" + msg);
            }
        }

02.fanout:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的
    a.介绍
        FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上
        在这种策略中,routingkey 将不起任何作用
    b.FanoutExchange交换机
        @Configuration
        public class RabbitFanoutConfig {
            public final static String FANOUTNAME = "sang-fanout";
            @Bean
            FanoutExchange fanoutExchange() {
                return new FanoutExchange(FANOUTNAME, true, false);
            }
            @Bean
            Queue queueOne() {
                return new Queue("queue-one");
            }
            @Bean
            Queue queueTwo() {
                return new Queue("queue-two");
            }
            @Bean
            Binding bindingOne() {
                return BindingBuilder.bind(queueOne()).to(fanoutExchange());
            }
            @Bean
            Binding bindingTwo() {
                return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
            }
        }
    c.创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上
        @Component
        public class FanoutReceiver {
            @RabbitListener(queues = "queue-one")
            public void handler1(String message) {
                System.out.println("FanoutReceiver:handler1:" + message);
            }

            @RabbitListener(queues = "queue-two")
            public void handler2(String message) {
                System.out.println("FanoutReceiver:handler2:" + message);
            }
        }
    d.两个消费者
        @Component
        public class FanoutReceiver {
            @RabbitListener(queues = "queue-one")
            public void handler1(String message) {
                System.out.println("FanoutReceiver:handler1:" + message);
            }
            @RabbitListener(queues = "queue-two")
            public void handler2(String message) {
                System.out.println("FanoutReceiver:handler2:" + message);
            }
        }
    e.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void fanoutTest() {
                rabbitTemplate
                .convertAndSend(RabbitFanoutConfig.FANOUTNAME,
                        null, "hello fanout!");
            }
        }
        -----------------------------------------------------------------------------------------------------
        注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null

03.topic:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
    a.介绍
        TopicExchange 是比较复杂但是也比较灵活的一种路由策略,
        在 TopicExchange 中,Queue 通过 routingkey 绑定到TopicExchange 上
        当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上
    b.TopicExchange交换机
        @Configuration
        public class RabbitTopicConfig {
            public final static String TOPICNAME = "sang-topic";
            @Bean
            TopicExchange topicExchange() {
                return new TopicExchange(TOPICNAME, true, false);
            }
            @Bean
            Queue xiaomi() {
                return new Queue("xiaomi");
            }
            @Bean
            Queue huawei() {
                return new Queue("huawei");
            }
            @Bean
            Queue phone() {
                return new Queue("phone");
            }
            @Bean
            Binding xiaomiBinding() {
                return BindingBuilder.bind(xiaomi()).to(topicExchange())
                        .with("xiaomi.#");
            }
            @Bean
            Binding huaweiBinding() {
                return BindingBuilder.bind(huawei()).to(topicExchange())
                        .with("huawei.#");
            }
            @Bean
            Binding phoneBinding() {
                return BindingBuilder.bind(phone()).to(topicExchange())
                        .with("#.phone.#");
            }
        }
        -----------------------------------------------------------------------------------------------------
        首先创建 TopicExchange,参数和前面的一致
        然后创建三个 Queue
        第一个 Queue 用来存储和 “xiaomi” 有关的消息
        第二个 Queue 用来存储和 “huawei” 有关的消息
        第三个 Queue 用来存储和 “phone” 有关的消息
        -----------------------------------------------------------------------------------------------------
        将三个 Queue 分别绑定到 TopicExchange 上
        第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以“xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上
        第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上
        第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上
    c.三个消费者
        @Component
        public class TopicReceiver {
            @RabbitListener(queues = "phone")
            public void handler1(String message) {
                System.out.println("PhoneReceiver:" + message);
            }
            @RabbitListener(queues = "xiaomi")
            public void handler2(String message) {
                System.out.println("XiaoMiReceiver:"+message);
            }
            @RabbitListener(queues = "huawei")
            public void handler3(String message) {
                System.out.println("HuaWeiReceiver:"+message);
            }
        }
    d.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void topicTest() {
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻..");
            }
        }
        -----------------------------------------------------------------------------------------------------
        第一条消息将被路由到名称为 “xiaomi” 的 Queue 上
        第二条消息将被路由到名为 “huawei” 的 Queue 上
        第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上
        第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上
        最后一条消息则将被路由到名为 “phone” 的 Queue 上

04.headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
    a.介绍
        HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关
    b.HeadersExchange交换机
        @Configuration
        public class RabbitHeaderConfig {
            public final static String HEADERNAME = "javaboy-header";
            @Bean
            HeadersExchange headersExchange() {
                return new HeadersExchange(HEADERNAME, true, false);
            }
            @Bean
            Queue queueName() {
                return new Queue("name-queue");
            }
            @Bean
            Queue queueAge() {
                return new Queue("age-queue");
            }
            @Bean
            Binding bindingName() {
                Map<String, Object> map = new HashMap<>();
                map.put("name", "sang");
                return BindingBuilder.bind(queueName())
                        .to(headersExchange()).whereAny(map).match();
            }
            @Bean
            Binding bindingAge() {
                return BindingBuilder.bind(queueAge())
                        .to(headersExchange()).where("age").exists();
            }
        }
        -----------------------------------------------------------------------------------------------------
        ,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value
        就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配
        whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含
        age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上
    c.创建两个消息消费者
        @Component
        public class HeaderReceiver {
            @RabbitListener(queues = "name-queue")
            public void handler1(byte[] msg) {
                System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length));
            }
            @RabbitListener(queues = "age-queue")
            public void handler2(byte[] msg) {
                System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length));
            }
        }
    d.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void headerTest() {
                Message nameMsg = MessageBuilder
                        .withBody("hello header! name-queue".getBytes())
                        .setHeader("name", "sang").build();
                Message ageMsg = MessageBuilder
                        .withBody("hello header! age-queue".getBytes())
                        .setHeader("age", "99").build();
                rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
                rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
            }
        }
        -----------------------------------------------------------------------------------------------------
        这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去

2.4 [4]Routing

00.介绍
    a.概念
        一个生产者,一个交换机,两个队列,两个消费者
        生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可
    b.说明
        这个就是按照 routing key 去路由消息

2.5 [5]Topics

00.介绍
    a.概念
        一个生产者,一个交换机,两个队列,两个消费者
        生产者创建 Topic 的 Exchange 并且绑定到队列中
        这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写
    b.说明
        通过 * 和 # 关键字,对指定 RoutingKey 内容

2.6 [6]RPC

00.介绍
    a.概念
        关于 RabbitMQ 实现 RPC 调用,有的小伙伴可能会有一些误解,心想这还不简单?搞两个消息队列 queue_1 和 queue_2
        首先客户端发送消息到 queue_1 上,服务端监听 queue_1 上的消息,收到之后进行处理;处理完成后
        服务端发送消息到 queue_2 队列上,然后客户端监听 queue_2 队列上的消息,这样就知道服务端的处理结果了
    b.说明
        这种方式不是不可以,就是有点麻烦!RabbitMQ 中提供了现成的方案可以直接使用,非常方便。接下来我们就一起来学习下

01.架构图
    首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字
    Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中
    Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了

02.生产者
    a.application.properties
        spring.rabbitmq.host=localhost
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.publisher-confirm-type=correlated
        spring.rabbitmq.publisher-returns=true
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-confirm-type=correlated
        首先是配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id
        只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-returns=true
        最后一行配置则是开启发送失败退回
    b.配置类
        @Configuration
        public class RabbitConfig {

            public static final String RPC_QUEUE1 = "queue_1";
            public static final String RPC_QUEUE2 = "queue_2";
            public static final String RPC_EXCHANGE = "rpc_exchange";

            /**
             * 设置消息发送RPC队列
             */
            @Bean
            Queue msgQueue() {
                return new Queue(RPC_QUEUE1);
            }

            /**
             * 设置返回队列
             */
            @Bean
            Queue replyQueue() {
                return new Queue(RPC_QUEUE2);
            }

            /**
             * 设置交换机
             */
            @Bean
            TopicExchange exchange() {
                return new TopicExchange(RPC_EXCHANGE);
            }

            /**
             * 请求队列和交换器绑定
             */
            @Bean
            Binding msgBinding() {
                return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
            }

            /**
             * 返回队列和交换器绑定
             */
            @Bean
            Binding replyBinding() {
                return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
            }


            /**
             * 使用 RabbitTemplate发送和接收消息
             * 并设置回调队列地址
             */
            @Bean
            RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
                RabbitTemplate template = new RabbitTemplate(connectionFactory);
                template.setReplyAddress(RPC_QUEUE2);
                template.setReplyTimeout(6000);
                return template;
            }


            /**
             * 给返回队列设置监听器
             */
            @Bean
            SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
                SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                container.setConnectionFactory(connectionFactory);
                container.setQueueNames(RPC_QUEUE2);
                container.setMessageListener(rabbitTemplate(connectionFactory));
                return container;
            }
        }
        -----------------------------------------------------------------------------------------------------
        配置了消息发送队列 msgQueue 和消息返回队列 replyQueue,然后将这两个队列和消息交换机进行绑定
    c.生产者
        @RestController
        public class RpcClientController {

            private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class);

            @Autowired
            private RabbitTemplate rabbitTemplate;

            @GetMapping("/send")
            public String send(String message) {
                // 创建消息对象
                Message newMessage = MessageBuilder.withBody(message.getBytes()).build();

                logger.info("client send:{}", newMessage);

                //客户端发送消息
                Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);

                String response = "";
                if (result != null) {
                    // 获取已发送的消息的 correlationId
                    String correlationId = newMessage.getMessageProperties().getCorrelationId();
                    logger.info("correlationId:{}", correlationId);

                    // 获取响应头信息
                    HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();

                    // 获取 server 返回的消息 id
                    String msgId = (String) headers.get("spring_returned_message_correlation");

                    if (msgId.equals(correlationId)) {
                        response = new String(result.getBody());
                        logger.info("client receive:{}", response);
                    }
                }
                return response;
            }
        }
        -----------------------------------------------------------------------------------------------------
        消息发送调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息
        服务端返回的消息中,头信息中包含了 spring_returned_message_correlation 字段,这个就是消息发送时候的
        correlation_id,通过消息发送时候的 correlation_id 以及返回消息头中的 spring_returned_message_correlation
        字段值,我们就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容就是针对这个发送的消息的
        -----------------------------------------------------------------------------------------------------
        这就是整个客户端的开发,其实最最核心的就是 sendAndReceive 方法的调用。调用虽然简单,但是准备工作还是要做足够
        。例如如果我们没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id
        这样就无法将返回的消息内容和发送的消息内容关联起来

03.消费者
    a.application.properties
        spring.rabbitmq.host=localhost
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.publisher-confirm-type=correlated
        spring.rabbitmq.publisher-returns=true
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-confirm-type=correlated
        首先是配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id
        只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-returns=true
        最后一行配置则是开启发送失败退回
    b.配置类
        @Configuration
        public class RabbitConfig {

            public static final String RPC_QUEUE1 = "queue_1";
            public static final String RPC_QUEUE2 = "queue_2";
            public static final String RPC_EXCHANGE = "rpc_exchange";

            /**
             * 配置消息发送队列
             */
            @Bean
            Queue msgQueue() {
                return new Queue(RPC_QUEUE1);
            }

            /**
             * 设置返回队列
             */
            @Bean
            Queue replyQueue() {
                return new Queue(RPC_QUEUE2);
            }

            /**
             * 设置交换机
             */
            @Bean
            TopicExchange exchange() {
                return new TopicExchange(RPC_EXCHANGE);
            }

            /**
             * 请求队列和交换器绑定
             */
            @Bean
            Binding msgBinding() {
                return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
            }

            /**
             * 返回队列和交换器绑定
             */
            @Bean
            Binding replyBinding() {
                return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
            }
        }
    c.消费者
        @Component
        public class RpcServerController {
            private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class);
            @Autowired
            private RabbitTemplate rabbitTemplate;

            @RabbitListener(queues = RabbitConfig.RPC_QUEUE1)
            public void process(Message msg) {
                logger.info("server receive : {}",msg.toString());
                Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();
                CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
                rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);
            }
        }
        -----------------------------------------------------------------------------------------------------
        这里的逻辑就比较简单了
        服务端首先收到消息并打印出来
        服务端提取出原消息中的 correlation_id
        服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数

2.7 [7]Publisher Confirms

00.RabbitMQ消息发送机制
    a.概念
        RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上
        然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费
    b.说明
        大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认
        消息成功到达 Exchange
        消息成功到达 Queue
        如果能确认这两步,那么我们就可以认为消息发送成功了
        -----------------------------------------------------------------------------------------------------
        如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息
        多次重试之后,如果消息还是不能到达,则可能就需要人工介入了
    c.说明
        经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了
        1.确认消息到达 Exchange
        2.确认消息到达 Queue
        3.开启定时任务,定时投递那些发送失败的消息
        上面提出的三个步骤,第三步需要我们自己实现,前两步 RabbitMQ 则有现成的解决方案
        -----------------------------------------------------------------------------------------------------
        如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案
        开启事务机制
        发送方确认机制
        -----------------------------------------------------------------------------------------------------
        这是两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报错

3 消费:整理

3.1 汇总:3个

00.汇总
    a.说明
        一个生产者,一个交换机,两个队列,两个消费者
        一个队列中的每条消息只会被一个消费者消费
        但同一个队列可以有多个消费者,消息会在这些消费者之间轮询分发
    b.分类
        a.Simple简单队列
            最简单的收发模式
        b.Work工作队列
            资源的竞争
        c.publish/subscribe发布订阅
            共享资源
            direct路由模式,直连交换机,依赖routingkey
            fanout发布订阅,扇形交换机,不依赖routingkey
            topic主题模式,主题交换机,依赖routingkey
            headers x-mactch模式,头交换机,不依赖routingkey

01.Simple简单队列:最简单的收发模式
    a.说明
        一个生产者,一个消费者,一个队列(一个默认的交换机)
    b.过程
        消息产生消息,将消息放入队列
        消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后
        自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失
        这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)

02.Work工作队列:资源的竞争
    a.方式
        方式1:轮询模式
        方式2:公平分发(能者多劳)
    b.说明
        一个生产者,两个消费者,一个队列(一个默认的交换机)
    c.过程
        消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费
        C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息
        (隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用
        可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)

03.publish/subscribe发布订阅:共享资源
    a.背景
        a.说明
            不管是简单队列模型还是工作队列模式只用到了 RabbitMQ 中的 Queue 队列,
            消息直接发送到队列中,消费者直接从队列中获取消息。
            -------------------------------------------------------------------------------------------------
            现在我们学习另一种消息的处理方式,消息生产者将消息发送到 Exchange 交换机中,
            再由交换机将消息投递到 Queue队列中,交换机和队列的关系是多对多的关系,
            表示一个交换机可以通过一定的规矩将消息投递到多个队列中,同样一个队列也可以接收多个交换机投递的消息
            -------------------------------------------------------------------------------------------------
            一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,
            每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
            需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,
            这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,
            -------------------------------------------------------------------------------------------------
            每个消费者监听自己的队列;
            生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
        b.Direct消息流转过程
            1.生产者向Exchange发送消息
            2.队列使用路由密钥绑定到Exchange
            3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange
            4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列
            5.订阅队列的使用者接收消息并进行处理
    b.direct路由模式,直连交换机,依赖routingkey
        a.说明
            消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中
            基于完全匹配、单播的模式
            -------------------------------------------------------------------------------------------------
            DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上
            当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上
            例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收
            -------------------------------------------------------------------------------------------------
            处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配
            如果一个队列绑定到该交换机上要求路由键 test,则只有被标记为test的消息才被转发,不会转发test.aaa
            也不会转发dog.123,只会转发test。
        b.Direct 消息流转过程
            1.生产者向Exchange发送消息
            2.队列使用路由密钥绑定到Exchange
            3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange
            4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列
            5.订阅队列的使用者接收消息并进行处理
        c.组成
            一个生产者,一个交换机,两个队列,两个消费者
        d.特点
            Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式
    c.fanout发布订阅,扇形交换机,不依赖routingkey
        a.说明
            把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的
            -------------------------------------------------------------------------------------------------
            FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上
            在这种策略中,routingkey 将不起任何作用
            -------------------------------------------------------------------------------------------------
            不处理路由键。你只需要简单的将队列绑定到交换机上
            一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
            很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
        b.组成
            一个生产者,一个交换机,三个队列,三个消费者
        c.特点
            发布与订阅模式,是一种广播机制,它是没有路由key的模式
    d.topic主题模式,主题交换机,依赖routingkey
        a.说明
            通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
            -------------------------------------------------------------------------------------------------
            星号井号代表通配符
            星号代表多个单词,井号代表一个单词
            路由功能添加模糊匹配
            消息产生者产生消息,把消息交给交换机
            交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
            -------------------------------------------------------------------------------------------------
            消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
            根据业务功能定义路由字符串
            从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中
            业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误
            -------------------------------------------------------------------------------------------------
            将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号#匹配一个或多个词,符号* 匹配不多不少一个词。
            因此audit.#能够匹配到audit.irs.corporate,但是audit.* 只会匹配到audit.irs
        b.Topic中的路由键设置规则
            1.Topic Exchange中的路由关键字必须包含零个或多个由.点分隔的单词,例如health.education
            2.Topic Exchange中的路由键通常称为路由模式。
            3.路由键允许只包含 星号(*)和 井号 (#)的正则表达式组成
            4.星号(*****)表示正好允许一个字。
            5.同样,井号(#)表示允许的单词数为零或更多。
            6.点号(.)表示–单词定界符。多个关键术语用点定界符分隔。
            7.如果路由模式为**health.\***,则意味着以第一个单词为首的路由键运行状况发送的任何消息都将到达队列。例如,health.education将到达此队列,但sports.health将不起作用
        c.Topic中的消息流转过程
            1.一个Queue队列通过路由键(P)绑定到 Exchange
            2.Producer 将带有P路由键(K)的消息发送到 Topic Exchange
            3.如果P与K匹配,则消息被传递到队列。路由密钥匹配的确定如下所述
            4.订阅队列的使用者将收到消息
        d.组成
            一个生产者,一个交换机,两个队列,两个消费者
        e.特点
            Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式
            生产者创建Topic的exchange并且绑定到队列中,绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx 去别写。* -》代表xxx,# -》代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底时什么
            简单的说:* 表示一个,# 表示 0个 或者 多个
    e.headers x-mactch模式,头交换机,不依赖routingkey
        a.说明
            不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
            -------------------------------------------------------------------------------------------------
            Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的
            Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对
            接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息
            匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义
            all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了
            fanout,direct,topic exchange的routingKey都需要要字符串形式的
            而headers exchange则没有这个要求,因为键值对的值可以是任何类型
        b.Headers中消息流转过程
            1.一个或多个队列使用标头属性(H)绑定(链接)到标头交换
            2.生产者将带有标头属性(MH)的消息发送到此Exchange
            3.如果MH与H匹配,则消息将转发到队列
            4.监听队列的使用者接收消息并对其进行处理

3.2 [1]简单队列

01.Simple简单队列
    a.概念
        如果把使用 RabbitMQ 进行消息发送的过程比喻成邮寄邮件。
        那么简单队列的场景是,只有一个邮箱、一个邮局、一个投递员,消息通过 RabbitMQ 进行一对一发送,发送过程最简单。
        -------------------------------------------------------------------------------------------------
        一个生产者,一个消费者,一个队列(一个默认的交换机)
        -------------------------------------------------------------------------------------------------
        消息产生消息,将消息放入队列
        消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,
        自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,
        这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
    b.一个生产者,一个消费者,一个队列(一个默认的交换机)
        a.生产者
            package com.ruoyi.rabbitmq1.demo01;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    channel.queueDeclare("simple-queue1", false, false, false, null);
                    // 5: 准备发送消息的内容
                    String message = "你好,消息队列!!!";
                    // 6: 发送消息给队列queue1
                    /*
                     * @params1: 交换机exchange
                     * @params2: 队列名称、路由key(routing)
                     * @params3: 属性配置
                     * @params4: 发送消息的内容
                     **/
                    // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
                    channel.basicPublish("", "simple-queue1", null, message.getBytes());
                    System.out.println("消息发送成功!");

                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo01;

            import com.rabbitmq.client.*;
            import java.io.IOException;

            public class Consumer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    /*
                     * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                     * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                     */
                    try (Connection connection = connectionFactory.newConnection("消费者");
                         Channel channel = connection.createChannel()){

                        // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息(声明队列可以在生产者或者消费者端)
                        //channel.queueDeclare("queue1", false, false, false, null);

                        // 接收消息,监听对应的队列名即可
                        /*
                         *  @params1: queue 队列的名称
                         *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                         *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                         *  @params4: cancelCallback 消费失败回调
                         * */
                        channel.basicConsume("simple-queue1", true, new DeliverCallback() {
                            @Override
                            public void handle(String consumerTag, Delivery delivery) throws IOException {
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                            }
                        }, new CancelCallback() {
                            @Override
                            public void handle(String s)  {
                                System.out.println("接受失败了...");
                            }
                        });

                        // 让程序停止,好接收消费
                        System.out.println("开始接受消息");
                        System.in.read();
                    }

                }
            }
        c.运行
            先运行消费者,然后运行生产者。最后消费者控制台输出:
            开始接受消息
            收到消息是你好,消息队列!!!

3.3 [2]工作队列

01.Work工作队列
    a.概念
        在前面我们学了Simple Queue 模型,Simple Queue 模型消息生产者和消费者是一 一对应的,
        消息由生产者发送到队列并由消费者消费。假设有一种消息,消息生产者一次性发送了10条消息,
        消费者消费一条消息执行耗时1秒,如果使用简单队列模式总共耗时10秒;
        如果使用工作队列模式有3个消费者共同消费这些消息理想情况下只需要3秒就可以处理完这些消息。
        -----------------------------------------------------------------------------------------------------
        方式1:轮询模式
        方式2:公平分发(能者多劳)
        -----------------------------------------------------------------------------------------------------
        一个生产者,两个消费者,一个队列(一个默认的交换机)
        -----------------------------------------------------------------------------------------------------
        消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。
        C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息
        (隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,
        可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
    b.轮询模式
        a.生产者
            package com.ruoyi.rabbitmq1.demo02.demo01;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    channel.queueDeclare("work-queue1", false, false, false, null);

                    // 5: 准备发送消息的内容
                    // 6: 发送消息给队列queue1
                    /*
                     * @params1: 交换机exchange
                     * @params2: 队列名称、路由key(routing)
                     * @params3: 属性配置
                     * @params4: 发送消息的内容
                     **/
                    for (int i = 1; i <= 20; i++) {
                        channel.basicPublish("", "work-queue1", null, ("work-轮询模式:"+ i).getBytes());
                    }
                    System.out.println("消息发送成功!");

                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            a.消费者1
                package com.ruoyi.rabbitmq1.demo02.demo01;

                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;

                public class Consumer1 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");

                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者1");
                        Channel channel = connection.createChannel();

                        // 接收消息,监听对应的队列名即可
                        channel.basicConsume("work-queue1", true, (consumerTag, delivery) ->
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8")),
                                consumerTag  -> {
                        });

                    }
                }
            b.消费者2
                package com.ruoyi.rabbitmq1.demo02.demo01;

                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;

                public class Consumer2 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");

                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者2");
                        Channel channel = connection.createChannel();

                        // 接收消息,监听对应的队列名即可
                        channel.basicConsume("work-queue1", true, (consumerTag, delivery) ->
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8")),
                                consumerTag  -> {
                        });

                    }
                }
        c.运行
            先运行消费者,然后运行生产者。最后消费者控制台输出:
            消费者1:
            收到消息是work-轮询模式:1
            收到消息是work-轮询模式:3
            收到消息是work-轮询模式:5
            收到消息是work-轮询模式:7
            收到消息是work-轮询模式:9
            收到消息是work-轮询模式:11
            收到消息是work-轮询模式:13
            收到消息是work-轮询模式:15
            收到消息是work-轮询模式:17
            收到消息是work-轮询模式:19
            消费者2:
            收到消息是work-轮询模式:2
            收到消息是work-轮询模式:4
            收到消息是work-轮询模式:6
            收到消息是work-轮询模式:8
            收到消息是work-轮询模式:10
            收到消息是work-轮询模式:12
            收到消息是work-轮询模式:14
            收到消息是work-轮询模式:16
            收到消息是work-轮询模式:18
            收到消息是work-轮询模式:20
    c.公平分发(能者多劳)
        a.生产者
            package com.ruoyi.rabbitmq1.demo02.demo02;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    channel.queueDeclare("work-queue2", false, false, false, null);

                    // 5: 准备发送消息的内容
                    // 6: 发送消息给队列queue1
                    /*
                     * @params1: 交换机exchange
                     * @params2: 队列名称、路由key(routing)
                     * @params3: 属性配置
                     * @params4: 发送消息的内容
                     **/
                    for (int i = 1; i <= 20; i++) {
                        channel.basicPublish("", "work-queue2", null, ("work-公平分发:"+ i).getBytes());
                    }
                    System.out.println("消息发送成功!");

                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            a.消费者1
                package com.ruoyi.rabbitmq1.demo02.demo02;

                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;

                import com.rabbitmq.client.*;
                import java.util.concurrent.TimeUnit;

                public class Consumer1 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");

                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者1");
                        Channel channel = connection.createChannel();

                        // 同一时刻服务器只会发送一条消息给消费者
                        channel.basicQos(1);

                        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                            try {
                                // 加一个睡眠,模拟消费者1消费慢
                                TimeUnit.MILLISECONDS.sleep(1000);
                                System.out.println("Work1-收到消息是" + new String(delivery.getBody(), "UTF-8"));
                                // 确认消息消费,参数1:确认队列中哪个具体消息,参数2:是否开启多个消息同时确认
                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        };

                        // 接收消息,开启手动确认消费消息机制,设置autoAck为false,防止消息一下子都进入消费者
                        channel.basicConsume("work-queue2", false, deliverCallback,consumerTag  -> {
                        });

                    }
                }
            b.消费者2
                package com.ruoyi.rabbitmq1.demo02.demo02;

                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;
                import com.rabbitmq.client.*;
                import java.util.concurrent.TimeUnit;

                public class Consumer2 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");

                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者2");
                        Channel channel = connection.createChannel();

                        // 同一时刻服务器只会发送一条消息给消费者
                        channel.basicQos(1);

                        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                            try {
                                // 加一个睡眠,模拟消费者1消费慢
                                TimeUnit.MILLISECONDS.sleep(100);
                                System.out.println("Work2-收到消息是" + new String(delivery.getBody(), "UTF-8"));
                                // 确认消息消费,参数1:确认队列中哪个具体消息,参数2:是否开启多个消息同时确认
                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        };

                        // 接收消息,开启确认机制,设置autoAck为false
                        channel.basicConsume("work-queue2", false, deliverCallback, consumerTag  -> {
                        });

                    }
                }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            消费者1:
            Work1-收到消息是work-公平分发:1
            Work1-收到消息是work-公平分发:2
            Work1-收到消息是work-公平分发:3
            Work1-收到消息是work-公平分发:4
            Work1-收到消息是work-公平分发:5
            Work1-收到消息是work-公平分发:6
            Work1-收到消息是work-公平分发:7
            Work1-收到消息是work-公平分发:8
            Work1-收到消息是work-公平分发:9
            Work1-收到消息是work-公平分发:10
            Work1-收到消息是work-公平分发:14
            消费者2:
            Work2-收到消息是work-公平分发:11
            Work2-收到消息是work-公平分发:12
            Work2-收到消息是work-公平分发:13
            Work2-收到消息是work-公平分发:15
            Work2-收到消息是work-公平分发:16
            Work2-收到消息是work-公平分发:17
            Work2-收到消息是work-公平分发:18
            Work2-收到消息是work-公平分发:19
            Work2-收到消息是work-公平分发:20

3.4 [3]发布订阅:4种

00.汇总
    a.direct,直连交换机
        消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配
        交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式
    b.fanout,扇形交换机
        把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的
    c.topic,主题交换机
        通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
    d.headers,头交换机
        不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了

01.direct路由模式,直连交换机,依赖routingkey
    a.概念
        消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中
        基于完全匹配、单播的模式
        -------------------------------------------------------------------------------------------------
        DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,
        当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,
        例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
        -------------------------------------------------------------------------------------------------
        处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
        如果一个队列绑定到该交换机上要求路由键 test,则只有被标记为test的消息才被转发,不会转发test.aaa,
        也不会转发dog.123,只会转发test。
        -------------------------------------------------------------------------------------------------
        Direct 消息流转过程:
        1.生产者向Exchange发送消息。
        2.队列使用路由密钥绑定到Exchange。
        3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange。
        4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列。
        5.订阅队列的使用者接收消息并进行处理。
        -------------------------------------------------------------------------------------------------
        一个生产者,一个交换机,两个队列,两个消费者
        特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo03;

            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();

                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
                    String exchangeName = "direct-exchange";

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机
                    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
                    // 4-2: 声明队列
                    channel.queueDeclare("direct-info", false, false, false, null);
                    channel.queueDeclare("direct-error", false, false, false, null);
                    // 4-3: 绑定交换机和队列
                    channel.queueBind("direct-info", exchangeName, "info");
                    channel.queueBind("direct-error", exchangeName, "error");

                    // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                    channel.basicPublish(exchangeName, "info", null, "你好,消息队列!".getBytes("UTF-8"));
                    channel.basicPublish(exchangeName, "error", null, "你好,消息队列!".getBytes("UTF-8"));

                    System.out.println("消息发送成功!");

                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo03;

            import com.rabbitmq.client.CancelCallback;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import com.rabbitmq.client.DeliverCallback;
            import com.rabbitmq.client.Delivery;
            import java.io.IOException;

            public class Consumer {

                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();

                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();

                         // 接收消息,监听对应的队列名即可
                        /*
                         *  @params1: queue 队列的名称
                         *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                         *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                         *  @params4: cancelCallback 消费失败回调
                         * */
                        // 3:定义接受消息的回调
                        channel.basicConsume(queueName, true, new DeliverCallback() {
                            @Override
                            public void handle(String consumerTag, Delivery delivery) throws IOException {
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                            }
                        }, new CancelCallback() {
                            @Override
                            public void handle(String s)  {
                                System.out.println("接受失败了...");
                            }
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.out.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };

                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "direct-info").start();
                    new Thread(runnable, "direct-error").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            direct-error:收到消息是:你好,消息队列!
            direct-info:收到消息是:你好,消息队列!

02.fanout发布订阅,扇形交换机,不依赖routingkey
    a.概念
        把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
        -------------------------------------------------------------------------------------------------
        FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,
        在这种策略中,routingkey 将不起任何作用
        -------------------------------------------------------------------------------------------------
        不处理路由键。你只需要简单的将队列绑定到交换机上。
        一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
        很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
        -------------------------------------------------------------------------------------------------
        一个生产者,一个交换机,三个队列,三个消费者
        特点:发布与订阅模式,是一种广播机制,它是没有路由key的模式。
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo04;

            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();

                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
                    String exchangeName = "fanout-exchange";

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机(也可通过web页面创建)
                    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
                    // 4-2: 声明队列
                    channel.queueDeclare("fanout-queue1", false, false, false, null);
                    channel.queueDeclare("fanout-queue2", false, false, false, null);
                    channel.queueDeclare("fanout-queue3", false, false, false, null);
                    // 4-3: 绑定交换机和队列
                    channel.queueBind("fanout-queue1", exchangeName, "");
                    channel.queueBind("fanout-queue2", exchangeName, "");
                    channel.queueBind("fanout-queue3", exchangeName, "");

                    // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                    channel.basicPublish(exchangeName, "", null, "你好,消息队列!".getBytes("UTF-8"));
                    System.out.println("消息发送成功!");

                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo04;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import com.rabbitmq.client.DeliverCallback;

            public class Consumer {

                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();
                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();

                        // 6: 定义接受消息的回调
                        DeliverCallback deliverCallback = (consumerTag, delivery) ->
                                System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                        channel.basicConsume(queueName, true, deliverCallback, consumerTag  -> {
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.out.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };

                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "fanout-queue1").start();
                    new Thread(runnable, "fanout-queue2").start();
                    new Thread(runnable, "fanout-queue3").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            fanout-queue3:收到消息是:你好,消息队列!
            fanout-queue1:收到消息是:你好,消息队列!
            fanout-queue2:收到消息是:你好,消息队列!

03.topic主题模式,主题交换机,依赖routingkey
    a.概念
        通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
        -------------------------------------------------------------------------------------------------
        星号井号代表通配符
        星号代表多个单词,井号代表一个单词
        路由功能添加模糊匹配
        消息产生者产生消息,把消息交给交换机
        交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
        -------------------------------------------------------------------------------------------------
        消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
        根据业务功能定义路由字符串
        从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
        业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
        -------------------------------------------------------------------------------------------------
        将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号#匹配一个或多个词,符号* 匹配不多不少一个词。
        因此audit.#能够匹配到audit.irs.corporate,但是audit.* 只会匹配到audit.irs
        -------------------------------------------------------------------------------------------------
        Topic 中的路由键设置规则:
        1.Topic Exchange中的路由关键字必须包含零个或多个由 . 点分隔的单词,例如health.education。
        2.Topic Exchange中的路由键通常称为路由模式。
        3.路由键允许只包含 星号(*)和 井号 (#)的正则表达式组成
        4.星号(*****)表示正好允许一个字。
        5.同样,井号(#)表示允许的单词数为零或更多。
        6.点号(.)表示–单词定界符。多个关键术语用点定界符分隔。
        7.如果路由模式为**health.\***,则意味着以第一个单词为首的路由键运行状况发送的任何消息都将到达队列。例如,health.education将到达此队列,但sports.health将不起作用。
        -------------------------------------------------------------------------------------------------
        Topic 中的消息流转过程:
        1.一个Queue队列通过路由键(P)绑定到 Exchange。
        2.Producer 将带有P路由键(K)的消息发送到 Topic Exchange。
        3.如果P与K匹配,则消息被传递到队列。路由密钥匹配的确定如下所述。
        4.订阅队列的使用者将收到消息。
        -------------------------------------------------------------------------------------------------
        一个生产者,一个交换机,两个队列,两个消费者
        特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。
        生产者创建Topic的exchange并且绑定到队列中,绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx 去别写。* -》代表xxx,# -》代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底时什么
        简单的说:* 表示一个,# 表示 0个 或者 多个
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo05;

            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            import java.io.IOException;
            import java.util.concurrent.TimeoutException;

            public class Producer {
                public static void main(String[] args) throws IOException, TimeoutException {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    // 2: 创建连接,获取通道
                    Connection connection = connectionFactory.newConnection("生产者");
                    Channel channel = connection.createChannel();

                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
                    String exchangeName = "topic-exchange";

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机(也可通过web页面创建)
                    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);

                    // 4-2: 声明队列
                    channel.queueDeclare("topic-queue1", false, false, false, null);
                    channel.queueDeclare("topic-queue2", false, false, false, null);
                    channel.queueDeclare("topic-queue3", false, false, false, null);

                    // 4-3: 绑定交换机和队列
                    channel.queueBind("topic-queue1", exchangeName, "#.order");
                    channel.queueBind("topic-queue2", exchangeName, "*.user");

                    // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                    // topic-queue1、topic-queue2、topic-queue3
                    channel.basicPublish(exchangeName, "com.course.order", null, "routingKey:#.order".getBytes("UTF-8"));
                    // topic-queue3
                    channel.basicPublish(exchangeName, "com.order.user", null, "routingKey:#.user".getBytes("UTF-8"));

                    System.out.println("消息发送成功!");

                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo05;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            public class Consumer {

                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();
                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();

                        // 6: 定义接受消息的回调
                        channel.basicConsume(queueName, true,
                                (consumerTag, delivery) -> System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")),
                                consumerTag  -> {
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.err.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };

                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "topic-queue1").start();
                    new Thread(runnable, "topic-queue2").start();
                    new Thread(runnable, "topic-queue3").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            topic-queue1:收到消息是:routingKey:#.order

04.headers x-mactch模式,头交换机,不依赖routingkey
    a.概念
        不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
        -------------------------------------------------------------------------------------------------
        Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。
        Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,
        接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。
        匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。
        all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。
        fanout,direct,topic exchange的routingKey都需要要字符串形式的,
        而headers exchange则没有这个要求,因为键值对的值可以是任何类型
        -------------------------------------------------------------------------------------------------
        Headers 中消息流转过程:
        1.一个或多个队列使用标头属性(H)绑定(链接)到标头交换。
        2.生产者将带有标头属性(MH)的消息发送到此Exchange。
        3.如果MH与H匹配,则消息将转发到队列。
        4.监听队列的使用者接收消息并对其进行处理。
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo06;

            import com.rabbitmq.client.*;

            import java.util.HashMap;
            import java.util.Map;

            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    // 2: 创建连接,获取通道
                    Connection connection = connectionFactory.newConnection("生产者");
                    Channel channel = connection.createChannel();

                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机
                    channel.exchangeDeclare("header-exchange", BuiltinExchangeType.HEADERS);
                    // 4-2: 声明队列
                    channel.queueDeclare("header-queue-noe", false, false, false, null);
                    channel.queueDeclare("header-queue-two", false, false, false, null);
                    // 4-3: 绑定交换机和队列并加上配置header匹配规则
                    Map<String, Object> oneArgs = new HashMap<>();
                    // 匹配其中一个key/value 就能匹配成功
                    oneArgs.put("x-match", "any");
                    oneArgs.put("h1", "Header1");
                    oneArgs.put("h2", "Header2");
                    channel.queueBind("header-queue-noe", "header-exchange", "", oneArgs);
                    Map<String, Object> twoArgs = new HashMap<>();
                    // 必须匹配所有key/value 才能匹配成功
                    twoArgs.put("x-match", "all");
                    twoArgs.put("h1", "Header1");
                    twoArgs.put("h2", "Header2");
                    channel.queueBind("header-queue-two", "header-exchange", "", twoArgs);

                    /**
                     * 定义需要发送的消息和属性
                     */
                    Map<String, Object> headerMap = new HashMap<>();
                    headerMap.put("h1", "Header1");
                    headerMap.put("h3", "Header3");

                    AMQP.BasicProperties basicPropertiesOne = new AMQP.BasicProperties()
                            .builder().headers(headerMap).build();
                    channel.basicPublish("header-exchange", "", basicPropertiesOne, "Header Exchange example 1".getBytes("UTF-8"));
                    System.out.println("h1,h3:消息发送成功!");

                    headerMap.put("h2", "Header2");
                    AMQP.BasicProperties basicPropertiesTwo = new AMQP.BasicProperties()
                            .builder().headers(headerMap).build();
                    channel.basicPublish("header-exchange", "", basicPropertiesTwo, "Header Exchange example 2".getBytes("UTF-8"));
                    System.out.println("h1,h2,h3:消息发送成功!");

                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo06;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            public class Consumer {

                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();
                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();

                        // 6: 定义接受消息的回调
                        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
                            System.out.println(queueName + ":收到消息是: " + new String(message.getBody()));
                        }), consumerTag -> {
                            System.out.println(consumerTag);
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.out.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };

                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "header-queue-noe").start();
                    new Thread(runnable, "header-queue-two").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            header-queue-noe:收到消息是: Header Exchange example 1
            header-queue-two:收到消息是: Header Exchange example 2
            header-queue-noe:收到消息是: Header Exchange example 2

4 使用

4.1 汇总:7个

01.单条消息过期
    体现(生产者)
    // 设置消息的过期时间为10000毫秒(10秒)
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .expiration("10000") // 设置过期时间
        .build();
    channel.basicPublish("", "simple-queue1", props, message.getBytes());

02.第2种:队列消息过期
    体现(生产者)
    // 设置消息过期时间为 10000 毫秒(10秒)
    channel.queueDeclare("simple-queue1", false, false, false, new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});

03.第3种:特殊情况
    还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
    这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
    是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)

04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
    消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
    消息过期
    队列达到最大长度
    -----------------------------------------------------------------------------------------------------
    步骤如下:
    1: 创建连接工厂,设置连接属性
    2: 从连接工厂中获取
    3: 从连接中打开通道channel
    4.1: 声明死信交换机、死信队列、绑定死信队列和死信交换机
    4.2: 声明普通交换机、声明普通队列,设置TTL和DLX、绑定普通队列和普通交换机
    5: 通过channel发送消息
    -----------------------------------------------------------------------------------------------------
    体现(生产者)
    // 4.1
    // 声明死信交换机
    channel.exchangeDeclare("dlx-exchange", "direct");
    // 声明死信队列
    channel.queueDeclare("dlx-queue", true, false, false, null);
    // 绑定死信队列和死信交换机
    channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

    // 4.2
    // 声明普通交换机
    // 使用默认 (AMQP Default)
    // 声明普通队列,设置TTL和DLX
    Map<String, Object> map = new HashMap<>();
    map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
    map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机                       重
    map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key        重
    channel.queueDeclare("simple-queue3", true, false, false, map);
    // 绑定普通队列和普通交换机
    // 无需绑定

05.第5种:rabbitmq_delayed_message_exchange 插件 实现延迟队列
    // 4: 声明延迟交换机
    channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);      重
    channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
    channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

    // 5: 准备发送消息的内容
    String message = "你好,消息队列!!! 5000毫秒";
    // 设置延迟时间(例如,5000毫秒)
    int delay = 5000;
    // 6: 发送消息给延迟交换机
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
            .build();
    channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
    System.out.println("消息发送成功!");

06.第6种:发送可靠性?确认机制、事务 / 消费可靠性?确认机制、幂等性
    a.方式1:通过AMQP提供的事务机制实现
        -- 开启事务
        channel.txSelect();
        -- 提交事务
        channel.txCommit();
        -- 回滚事务
        channel.txRollback();
    b.方式2:消息的发送者确认模式
        a.说明
            消息的发送者确认模式的使用和事务类似,也是通过channel进行发送确认的,
            但该模式和事务有着本质的区别就是发送消息丢失的时候不会像事务一样停止发送消息,
            而是补发消息直到消息发送到对方确认为止。
        b.三种实现方式
            a.方法1
                channel.waitForConfirms()普通发送消息确认模式
                该模式是不断的等待接收方来返回消息,如果有返回消息则说明消息发送成功
                -----------------------------------------------------------------------------------------
                boolean flag= channel.waitForConfirms();会堵塞线程等带服务器来返回确认消息。
                可以为这个函数指定一个毫秒值用于等待服务器的确认超时时间。
                如果抛出异常表示服务器出了问题,需要补发消息。
                无论是返回false还是抛出异常都有可能消息发送成功或者没有发送成功。
                补发消息可以将消息缓存到Redis中稍后使用定时任务来补发,或者使用递归的方法来补发
            b.方法2
                channel.waitForConfirmsOrDie()函数批量确认模式
                -----------------------------------------------------------------------------------------
                channel.waitForConfirmsOrDie();
                该函数会同时向服务中确认之前当前通道中发送消息是否已经全部写入成功,该函数没有返回值,
                如果服务器中没有一条消息能够发送成功或者向服务器发送确认时服务不可访问都被认定为消息发送失败。
                可能消息发送成功,也可能消息没发送成功
                -----------------------------------------------------------------------------------------
                channel.waitForConfirmsOrDie();
                也可以指定一个毫秒值来用于等带服务器的确认时间,如果超过这个时间就抛出异常,表示确认失败需要补发
                -----------------------------------------------------------------------------------------
                注意:
                批量确认消息比普通确认要快,但是如果一但出现了消息补发的情况,就不能确定是哪条消息需要补发,
                所以就会将本次发送的所有消息进行补发。
            c.方法3
                channel.addConfirmListener()异步监听发送确认模式。需要new ConfirmListener()来实现里面的回调函数
                public void handleAck(long l, boolean b) throws IOException
                public void handleNack(long l, boolean b) throws IOException

07.第7种:Stream流
    a.说明
        a.概念
            abbitMQ 从 v3.9 版本开始引入了一个名为 RabbitMQ Streams 的新功能
            它允许消息以高吞吐量和低延迟的方式通过流的形式进行传递
        b.特点
            RabbitMQ Streams 专为处理大量数据和实时数据流设计,它与传统的 RabbitMQ 消息队列模型有所不同,主要特点包括:
            1.高吞吐量:RabbitMQ Streams 可以处理数百万条消息的高并发流式数据传输,适用于实时分析和大数据处理场景。
            2.持久化流:消息可以以流的形式持久化,这意味着即使消费者处理较慢,数据也不会丢失。
            3.多消费者读取:同一条消息可以被多个消费者读取,而不会影响消息的传递和持久化。
            4.按消息索引读取:消费者可以根据消息的索引选择从特定位置开始读取消息,而不像传统的队列那样必须按顺序处理。
            5.消费分区:类似于 Kafka,RabbitMQ Streams 也支持分区机制,帮助提高吞吐量和并行性。
    b.代码实现
        a.准备
            a.安装RabbitMQ并启用stream插件
                rabbitmq-plugins enable rabbitmq_stream
            b.依赖
                <dependency>
                    <groupId>com.rabbitmq</groupId>
                    <artifactId>stream-client</artifactId>
                    <version>0.5.1</version>
                </dependency>
            c.在RabbitMQ中创建Stream队列
                rabbitmqadmin declare queue name=my-stream durable=true type=stream

4.2 [1]单条消息过期

01.单条消息过期
    a.生产者
        package com.ruoyi.rabbitmq2.demo01;

        import com.rabbitmq.client.AMQP;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;

        public class Producer {

            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();

                // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                /*
                 *  申明队列:如果队列不存在会自动创建。
                 *  注意:
                 *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                 *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                 *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                 * */
                channel.queueDeclare("simple-queue1", true, false, false, null);
                // 5: 准备发送消息的内容
                String message = "你好,消息队列!!!";
                // 6: 发送消息给队列queue1
                /*
                 * @params1: 交换机exchange
                 * @params2: 队列名称、路由key(routing)
                 * @params3: 属性配置
                 * @params4: 发送消息的内容
                 **/
                // 设置消息的过期时间为10000毫秒(10秒)
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .expiration("10000") // 设置过期时间
                    .build();
                channel.basicPublish("", "simple-queue1", props, message.getBytes());
                System.out.println("消息发送成功!");

                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    b.消费者
        package com.ruoyi.rabbitmq2.demo01;

        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;

        public class Consumer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                     Channel channel = connection.createChannel()){

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息(声明队列可以在生产者或者消费者端)
                    //channel.queueDeclare("queue1", false, false, false, null);

                    // 接收消息,监听对应的队列名即可
                    /*
                     *  @params1: queue 队列的名称
                     *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                     *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                     *  @params4: cancelCallback 消费失败回调
                     * */
                    channel.basicConsume("simple-queue1", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s)  {
                            System.out.println("接受失败了...");
                        }
                    });

                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }

            }
        }
    c.体现(生产者)
        // 设置消息的过期时间为10000毫秒(10秒)
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .expiration("10000") // 设置过期时间
            .build();
        channel.basicPublish("", "simple-queue1", props, message.getBytes());

4.3 [2]队列消息过期

01.队列消息过期
    a.生产者
        package com.ruoyi.rabbitmq2.demo02;

        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import java.util.HashMap;

        public class Producer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();

                // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                /*
                 *  申明队列:如果队列不存在会自动创建。
                 *  注意:
                 *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                 *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                 *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                 * */
                // 设置消息过期时间为 10000 毫秒(10秒)
                channel.queueDeclare("simple-queue2", true, false, false,
                    new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});
                // 5: 准备发送消息的内容
                String message = "你好,消息队列!!!";
                // 6: 发送消息给队列queue1
                /*
                 * @params1: 交换机exchange
                 * @params2: 队列名称、路由key(routing)
                 * @params3: 属性配置
                 * @params4: 发送消息的内容
                 **/
                // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
                channel.basicPublish("", "simple-queue2", null, message.getBytes());
                System.out.println("消息发送成功!");

                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    b.消费者
        package com.ruoyi.rabbitmq2.demo02;

        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;

        public class Consumer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                     Channel channel = connection.createChannel()){

                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息(声明队列可以在生产者或者消费者端)
                    //channel.queueDeclare("queue1", false, false, false, null);

                    // 接收消息,监听对应的队列名即可
                    /*
                     *  @params1: queue 队列的名称
                     *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                     *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                     *  @params4: cancelCallback 消费失败回调
                     * */
                    channel.basicConsume("simple-queue2", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s)  {
                            System.out.println("接受失败了...");
                        }
                    });

                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }

            }
        }
    c.体现(生产者)
        // 设置消息过期时间为 10000 毫秒(10秒)
        channel.queueDeclare("simple-queue1", false, false, false, new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});

4.4 [3]特殊情况

01.特殊情况
    还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
    这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
    是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)

4.5 [4]延迟队列:DLX+TTL

01.DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
    a.生产者
        package com.ruoyi.rabbitmq2.demo03;

        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import java.util.HashMap;
        import java.util.Map;

        public class Producer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();

                // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                /*
                 *  申明队列:如果队列不存在会自动创建。
                 *  注意:
                 *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                 *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                 *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                 * */
                // 4.1
                // 声明死信交换机
                channel.exchangeDeclare("dlx-exchange", "direct");
                // 声明死信队列
                channel.queueDeclare("dlx-queue", true, false, false, null);
                // 绑定死信队列和死信交换机
                channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

                // 4.2
                // 声明普通交换机
                // 使用默认 (AMQP Default)
                // 声明普通队列,设置TTL和DLX
                Map<String, Object> map = new HashMap<>();
                map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
                map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机
                map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key
                channel.queueDeclare("simple-queue3", true, false, false, map);
                // 绑定普通队列和普通交换机
                // 无需绑定

                // 5.通过channel发送消息
                // @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                channel.basicPublish("", "simple-queue3", null, "你好,消息队列 dlx!!!".getBytes());
                System.out.println("消息发送成功!");

                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    b.消费者
        package com.ruoyi.rabbitmq2.demo03;

        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;

        public class Consumer {

            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                    Channel channel = connection.createChannel()) {
                    // 接收死信队列的消息
                    channel.basicConsume("dlx-queue", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到死信队列消息: " + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s) {
                            System.out.println("接受失败了...");
                        }
                    });

                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }

            }
        }
    c.体现(生产者)
        // 4.1
        // 声明死信交换机
        channel.exchangeDeclare("dlx-exchange", "direct");
        // 声明死信队列
        channel.queueDeclare("dlx-queue", true, false, false, null);
        // 绑定死信队列和死信交换机
        channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

        // 4.2
        // 声明普通交换机
        // 使用默认 (AMQP Default)
        // 声明普通队列,设置TTL和DLX
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
        map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机
        map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key
        channel.queueDeclare("simple-queue3", true, false, false, map);
        // 绑定普通队列和普通交换机
        // 无需绑定

4.6 [5]延迟队列:插件

01.rabbitmq_delayed_message_exchange 插件 实现延迟队列
    a.安装
        下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
        放入文件夹 C:\software\RabbitMQ\rabbitmq_server-3.8.9\plugins
        启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
        查看插件 rabbitmq-plugins list
    b.生产者
        package com.ruoyi.rabbitmq2.demo04;

        import com.rabbitmq.client.AMQP;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import java.util.HashMap;

        public class Producer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();

                // 4: 声明延迟交换机
                channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);
                channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
                channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

                // 5: 准备发送消息的内容
                String message = "你好,消息队列!!! 5000毫秒";
                // 设置延迟时间(例如,5000毫秒)
                int delay = 5000;
                // 6: 发送消息给延迟交换机
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                        .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
                        .build();
                channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
                System.out.println("消息发送成功!");

                // 5: 准备发送消息的内容
                String message2 = "你好,消息队列!!! 10000毫秒";
                // 设置延迟时间(例如,10000毫秒)
                int delay2 = 10000;
                // 6: 发送消息给延迟交换机
                AMQP.BasicProperties props2 = new AMQP.BasicProperties.Builder()
                        .headers(new HashMap<String, Object>(){{put("x-delay", delay2);}})
                        .build();
                channel.basicPublish("delayed_exchange", "", props2, message2.getBytes()); // 修改为使用空的路由键
                System.out.println("消息发送成功!");

                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    c.消费者
        package com.ruoyi.rabbitmq2.demo04;

        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;

        public class Consumer {

            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");

                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                    Channel channel = connection.createChannel()) {
                    // 接收消息,监听对应的队列名即可
                    /*
                     *  @params1: queue 队列的名称
                     *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                     *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                     *  @params4: cancelCallback 消费失败回调
                     * */
                    channel.basicConsume("delayed_queue", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s) {
                            System.out.println("接受失败了...");
                        }
                    });

                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }

            }
        }
    d.体现(生产者)
        // 4: 声明延迟交换机
        channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);
        channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
        channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

        // 5: 准备发送消息的内容
        String message = "你好,消息队列!!! 5000毫秒";
        // 设置延迟时间(例如,5000毫秒)
        int delay = 5000;
        // 6: 发送消息给延迟交换机
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
                .build();
        channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
        System.out.println("消息发送成功!");

4.7 [6]发送可靠性

01.发送可靠性?确认机制、事务 / 消费可靠性?确认机制、幂等性
    a.方式1:通过AMQP提供的事务机制实现
        a.说明
            启动事务,启动事务以后所有写入队列中的消息,必须显示调用事务的txCommit()函数来提交事务,获取txRollback()回滚事务
            注意:如果调用channel.txSelect()函数来开启事务,那么必须显示的调用事务的提交函数,否则消息不会进入到消息队列中
            注意:回滚事务必须在channel关闭之前
            -- 开启事务
            channel.txSelect();
            -- 提交事务
            channel.txCommit();
            -- 回滚事务
            channel.txRollback();
        b.生产者
            package com.ruoyi.rabbitmq2.demo05;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import java.io.IOException;

            public class Producer {

                public static void main(String[] args) {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    Connection connection = null;
                    Channel channel = null;
                    try {
                        connection = connectionFactory.newConnection();
                        channel = connection.createChannel();

                        // 创建队列
                        channel.queueDeclare("transactionQueue", true, false, false, null);
                        // 创建交换机
                        channel.exchangeDeclare("directTransactionExchange", "direct", true);
                        // 创建绑定关系
                        channel.queueBind("transactionQueue", "directTransactionExchange", "transactionRoutingKey");

                        // 开启事务
                        channel.txSelect();

                        // 发送消息
                        String message = "事务测试消息";
                        channel.basicPublish("directTransactionExchange", "transactionRoutingKey", null, message.getBytes("utf-8"));

                        // 提交事务
                        channel.txCommit();
                        System.out.println("消息发送成功");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        if (channel != null) {
                            try {
                                // 回滚事务
                                channel.txRollback();
                                channel.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        c.消费者
            package com.ruoyi.rabbitmq2.demo05;

            import com.rabbitmq.client.AMQP;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import com.rabbitmq.client.DefaultConsumer;
            import com.rabbitmq.client.Envelope;
            import java.io.IOException;

            public class Consumer {

                public static void main(String[] args) {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");

                    Connection connection = null;
                    Channel channel = null;
                    try {
                        connection = connectionFactory.newConnection();
                        channel = connection.createChannel();

                        // 创建队列
                        channel.queueDeclare("transactionQueue", true, false, false, null);
                        // 创建交换机
                        channel.exchangeDeclare("directTransactionExchange", "direct", true);
                        // 创建绑定关系
                        channel.queueBind("transactionQueue", "directTransactionExchange", "transactionRoutingKey");

                        // 开启事务
                        channel.txSelect();

                        // 获取消息
                        channel.basicConsume("transactionQueue", true, "", new DefaultConsumer(channel) {
                            @Override
                            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                String message = new String(body);
                                System.out.println(message);
                            }
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
    b.方式2:消息的发送者确认模式
        a.说明
            消息的发送者确认模式的使用和事务类似,也是通过channel进行发送确认的,
            但该模式和事务有着本质的区别就是发送消息丢失的时候不会像事务一样停止发送消息,
            而是补发消息直到消息发送到对方确认为止。
        b.三种实现方式
            a.方法1
                channel.waitForConfirms()普通发送消息确认模式
                该模式是不断的等待接收方来返回消息,如果有返回消息则说明消息发送成功
                ---------------------------------------------------------------------------------------------
                boolean flag= channel.waitForConfirms();会堵塞线程等带服务器来返回确认消息。
                可以为这个函数指定一个毫秒值用于等待服务器的确认超时时间。
                如果抛出异常表示服务器出了问题,需要补发消息。
                无论是返回false还是抛出异常都有可能消息发送成功或者没有发送成功。
                补发消息可以将消息缓存到Redis中稍后使用定时任务来补发,或者使用递归的方法来补发
            b.方法2
                channel.waitForConfirmsOrDie()函数批量确认模式
                ---------------------------------------------------------------------------------------------
                channel.waitForConfirmsOrDie();
                该函数会同时向服务中确认之前当前通道中发送消息是否已经全部写入成功,该函数没有返回值,
                如果服务器中没有一条消息能够发送成功或者向服务器发送确认时服务不可访问都被认定为消息发送失败。
                可能消息发送成功,也可能消息没发送成功
                ---------------------------------------------------------------------------------------------
                channel.waitForConfirmsOrDie();
                也可以指定一个毫秒值来用于等带服务器的确认时间,如果超过这个时间就抛出异常,表示确认失败需要补发
                ---------------------------------------------------------------------------------------------
                注意:
                批量确认消息比普通确认要快,但是如果一但出现了消息补发的情况,就不能确定是哪条消息需要补发,
                所以就会将本次发送的所有消息进行补发。
            c.方法3
                channel.addConfirmListener()异步监听发送确认模式。需要new ConfirmListener()来实现里面的回调函数
                public void handleAck(long l, boolean b) throws IOException
                public void handleNack(long l, boolean b) throws IOException

4.8 [7]Stream流

01.Stream流
    a.说明
        a.概念
            abbitMQ 从 v3.9 版本开始引入了一个名为 RabbitMQ Streams 的新功能
            它允许消息以高吞吐量和低延迟的方式通过流的形式进行传递
        b.特点
            RabbitMQ Streams 专为处理大量数据和实时数据流设计,它与传统的 RabbitMQ 消息队列模型有所不同,主要特点包括:
            1.高吞吐量:RabbitMQ Streams 可以处理数百万条消息的高并发流式数据传输,适用于实时分析和大数据处理场景。
            2.持久化流:消息可以以流的形式持久化,这意味着即使消费者处理较慢,数据也不会丢失。
            3.多消费者读取:同一条消息可以被多个消费者读取,而不会影响消息的传递和持久化。
            4.按消息索引读取:消费者可以根据消息的索引选择从特定位置开始读取消息,而不像传统的队列那样必须按顺序处理。
            5.消费分区:类似于 Kafka,RabbitMQ Streams 也支持分区机制,帮助提高吞吐量和并行性。
    b.代码实现
        a.准备
            a.安装RabbitMQ并启用stream插件
                rabbitmq-plugins enable rabbitmq_stream
            b.依赖
                <dependency>
                    <groupId>com.rabbitmq</groupId>
                    <artifactId>stream-client</artifactId>
                    <version>0.5.1</version>
                </dependency>
            c.在RabbitMQ中创建Stream队列
                rabbitmqadmin declare queue name=my-stream durable=true type=stream
        b.生产者
            import com.rabbitmq.stream.Environment;
            import com.rabbitmq.stream.Producer;
            import com.rabbitmq.stream.ProducerBuilder;
            import com.rabbitmq.stream.Message;
            import com.rabbitmq.stream.MessageBuilder;

            public class StreamProducer {
                public static void main(String[] args) {
                    // 创建一个环境(连接到RabbitMQ的Stream服务)
                    Environment environment = Environment.builder()
                            .host("localhost")  // RabbitMQ 服务器地址
                            .port(5552)         // Stream 插件的默认端口
                            .build();

                    // 创建生产者并指定Stream队列
                    Producer producer = environment.producerBuilder()
                            .stream("my-stream") // 指定队列名称
                            .build();

                    // 构建消息
                    Message message = MessageBuilder
                            .withBody("Hello, RabbitMQ Stream!".getBytes()) // 消息内容
                            .build();

                    // 发送消息
                    producer.send(message, confirmationStatus -> {
                        if (confirmationStatus.isConfirmed()) {
                            System.out.println("消息已确认发送成功!");
                        } else {
                            System.out.println("消息发送失败!");
                        }
                    });

                    // 关闭生产者
                    producer.close();
                    environment.close();
                }
            }
        c.消费者
            import com.rabbitmq.stream.Environment;
            import com.rabbitmq.stream.Consumer;
            import com.rabbitmq.stream.ConsumerBuilder;

            public class StreamConsumer {
                public static void main(String[] args) {
                    // 创建一个环境(连接到RabbitMQ的Stream服务)
                    Environment environment = Environment.builder()
                            .host("localhost")  // RabbitMQ 服务器地址
                            .port(5552)         // Stream 插件的默认端口
                            .build();

                    // 创建消费者并指定Stream队列
                    Consumer consumer = environment.consumerBuilder()
                            .stream("my-stream") // 指定队列名称
                            .messageHandler((context, message) -> {
                                String body = new String(message.getBodyAsBinary());
                                System.out.println("收到消息: " + body);
                            })
                            .build();

                    // 消费者会持续监听队列消息
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        consumer.close();
                        environment.close();
                    }));
                }
            }