1 基础

1.1 [1]定义

01.消息队列,是分布式系统中重要的组件。
    1.主要解决应用耦合,异步消息,流量削锋等问题。
    2.可实现高性能,高可用,可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。

02.主流消息队列
    Kafka
    RabbitMQ
    RocketMQ ,老版本是MetaQ
    ActiveMQ ,目前用的人越来越少

03.优点
    应用解耦:系统间通过消息通信,不用关心其他系统的处理
    异步处理:相比于传统的串行、并行方式,提高了系统吞吐量
    流量削锋:可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求
    消息通讯:消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
    日志处理:解决大量日志传输

04.缺点
    a.系统可用性降低
        系统引入的外部依赖越多,越容易挂掉。
        本来你就是 A 系统调用 BCD 三个系统的接口就好了,本来 ABCD 四个系统好好的,没啥问题
        你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?所以,消息队列一定要做好高可用
    b.系统复杂度提高。
        主要需要多考虑,1)消息怎么不重复消息。2)消息怎么保证不丢失。3)需要消息顺序的业务场景,怎么处理
    c.一致性问题
        A 系统处理完了直接返回成功了,人都以为你这个请求就成功了。
        但是问题是,要是 B、C。D 三个系统那里,B、D 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了
        当然,这不仅仅是 MQ 的问题,引入 RPC 之后本身就存在这样的问题。如果我们在使用 MQ 时,一定要达到数据的最终一致性
        即,C 系统最终执行完成

1.2 [1]组成:3个

01.生产者(Producer)
    负责产生消息

02.消费者(Consumer)
    负责消费消息

03.消息代理(Message Broker):负责存储消息和转发消息两件事情。其中,转发消息分为推送和拉取两种方式
    拉取(Pull),是指 Consumer 主动从 Message Broker 获取消息
    推送(Push),是指 Message Broker 主动将 Consumer 感兴趣的消息推送给 Consumer

1.3 [2]通信模式:4种

01.点对点通讯
    点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构

02.多点广播
    MQ适用于不同类型的应用
    其中重要的,也是正在发展中的是"多点广播"应用,即能够将消息发送到多个目标站点(Destination List)
    可以使用一条MQ指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ不仅提供了多点广播的功能
    而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时
    MQ将消息的一个复制版本和该系统上接收者的名单发送到目标MQ系统
    目标MQ系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量

03.发布/订阅(Publish/Subscribe)模式
    发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,
    用户或应用程序可以根据主题或内容接收到所需要的消息,发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散
    发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发
    在MQ家族产品中,MQ Event Broker是专门用于使用发布/订阅技术进行数据通讯的产品
    它支持基于队列和直接基于TCP/IP两种方式的发布和订阅

04.集群(Cluster)
    为了简化点对点通讯模式中的系统配置,MQ提供 Cluster 的解决方案
    集群类似于一个 域(Domain) ,集群内部的队列管理器之间通讯时,不需要两两之间建立消息通道
    而是采用 Cluster 通道与其它成员通讯,从而大大简化了系统配置
    此外,集群中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时
    其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性

1.4 [2]消费语义:3种

00.汇总
    消息至多被消费一次(At most once),最多一次
    消息至少被消费一次(At least once),最少一次
    消息仅被消费一次(Exactly once),有且仅有一次

01.最多一次(At Most Once)
    a.消息是否会重复
        否
    b.消息是否会丢失
        是
    c.优势
        生产端发送消息后不用等待和处理服务端响应,消息发送速度快
        适合对消息丢失不敏感的场景
    d.劣势
        网络或服务端问题可能导致消息丢失
    e.适用场景
        消息系统吞吐量大且对消息丢失不敏感的场景,例如日志收集、用户行为跟踪等

02.最少一次(At Least Once)
    a.消息是否会重复
        是
    b.消息是否会丢失
        否
    c.优势
        确保消息不会丢失,通过重试机制保证消息至少被处理一次
    d.劣势
        生产端发送消息后需要等待和处理服务端响应,可能导致吞吐量较低
        可能会有重复的消息,需要在消费端去重
    e.适用场景
        消息系统吞吐量一般,但绝不能丢消息的场景,对于重复消息不敏感

03.有且仅有一次(Exactly Once)
    a.消息是否会重复
        否
    b.消息是否会丢失
        否
    c.优势
        消息不重复且不丢失,提供很高的消息可靠性
    d.劣势
        吞吐量较低,通常需要复杂的机制来保证消息的唯一性和可靠性
    e.适用场景
        对消息可靠性要求很高的场景,同时可以容忍较小的吞吐量

1.5 [2]执行阶段:3个

01.消息生产阶段
    生产者将消息发送到Kafka集群的过程

02.消息存储阶段
    消息在Kafka集群中存储的过程

03.消息消费阶段
    消息堆积

1.6 [2]存储方式:3种

00.总结
    如果从易于实现和快速集成来看,文件系统 > 分布式 KV 存储 > 关系型数据库 DB,但是性能会下降很多

01.分布式KV存储
    这类 MQ 一般会采用诸如 LevelDB 、RocksDB 和 Redis 来作为消息持久化的方式
    由于分布式缓存的读写能力要优于 DB ,所以在对消息的读写能力要求都不是比较高的情况下,采用这种方式倒也不失为一种可以替代的设计方案
    消息存储于分布式 KV 需要解决的问题在于如何保证 MQ 整体的可靠性

02.文件系统
    目前业界较为常用的几款产品(RocketMQ / Kafka / RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化
    (刷盘一般可以分为异步刷盘和同步刷盘两种模式)

03.关系型数据库DB
    Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用 JDBC 的方式来做消息持久化,通过简单的 XML 配置信息即可实现JDBC消息存储

1.7 [2]投递方式:push/pull

01.push
    优点:及时性
    缺点:受限于消费者的消费能力,可能造成消息的堆积,Broker 会不断给消费者发送不能处理的消息

02.pull
    优点:主动权掌握在消费方,可以根据自己的消息速度进行消息拉取
    缺点:消费方不知道什么时候可以获取的最新的消息,会有消息延迟和忙等

1.8 [3]区别:kafka、rabbitmq

01.架构设计
    a.Kafka
        分布式日志系统:Kafka是一个分布式流处理平台,设计用于处理高吞吐量的实时数据流。它将数据存储在分区日志中,支持水平扩展
        持久化存储:Kafka将消息持久化到磁盘,并通过配置保留策略(如时间或大小)来管理数据生命周期
        消费组:Kafka使用消费组来管理消息消费,确保每条消息只被消费组中的一个消费者处理
    b.RabbitMQ
        消息代理:RabbitMQ是一个消息代理,设计用于可靠的消息传递和复杂的路由。它使用交换机和队列来路由和存储消息
        内存和磁盘存储:RabbitMQ可以将消息存储在内存中或持久化到磁盘,支持灵活的存储策略
        灵活的路由:RabbitMQ支持多种交换机类型(如直连、主题、扇出),提供复杂的消息路由能力

02.消息传递模型
    a.Kafka
        发布-订阅模型:Kafka的主题支持发布-订阅模型,生产者将消息发布到主题,消费者订阅主题以接收消息
        顺序保证:Kafka在同一个分区内保证消息的顺序性,消费者可以按照生产者发送的顺序消费消息
    b.RabbitMQ
        点对点和发布-订阅模型:RabbitMQ支持点对点和发布-订阅模型,通过交换机和队列实现消息路由
        确认机制:RabbitMQ提供消息确认机制,确保消息被成功消费

03.性能和扩展性
    a.Kafka
        高吞吐量:Kafka设计用于处理高吞吐量的数据流,适合大规模数据处理和实时分析
        水平扩展:Kafka通过增加分区和代理节点实现水平扩展,支持大规模集群部署
    b.RabbitMQ
        低延迟:RabbitMQ适合低延迟的消息传递,支持复杂的消息路由和可靠性保证
        集群模式:RabbitMQ支持集群模式,通过镜像队列和分布式节点实现高可用性

04.使用场景
    a.Kafka
        适用于日志聚合、流处理、实时分析、大数据集成等场景
        常用于需要高吞吐量和持久化存储的场景
    b.RabbitMQ
        适用于任务队列、即时通信、复杂路由、微服务通信等场景
        常用于需要低延迟和灵活路由的场景

1.9 [3]事务:保证数据一致性

01.事务
    1.生产者产生消息,发送一条半事务消息到MQ服务器
    2.MQ收到消息后,将消息持久化到存储系统,这条消息的状态是待发送状态
    3.MQ服务器返回ACK确认到生产者,此时MQ不会触发消息推送事件
    4.生产者执行本地事务
    5.如果本地事务执行成功,即commit执行结果到MQ服务器;如果执行失败,发送rollback
    6.如果是正常的commit,MQ服务器更新消息状态为可发送;如果是rollback,即删除消息
    7.如果消息状态更新为可发送,则MQ服务器会push消息给消费者。消费者消费完就回ACK
    8.如果MQ服务器长时间没有收到生产者的commit或者rollback,它会反查生产者,然后根据查询到的结果执行最终状态

1.10 [3]流程:一条普通的MQ消息,从产生到被消费

01.流程
    1.生产者产生消息,发送带MQ服务器
    2.MQ收到消息后,将消息持久化到存储系统
    3.MQ服务器返回ACk到生产者
    4.MQ服务器把消息push给消费者
    5.消费者消费完消息,响应ACK
    6.MQ服务器收到ACK,认为消息消费成功,即在存储中删除消息

2 场景

2.1 汇总:5个

01.应用解耦(核心)
    系统间通过消息通信,不用关心其他系统的处理

02.异步处理(核心)
    相比于传统的串行、并行方式,提高了系统吞吐量

03.流量削锋
    可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求

04.消息通讯
    消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等

05.日志处理
    解决大量日志传输

2.2 [1]应用解耦

01.传统模式
    缺点比较明显,系统间耦合性太强。系统 A 在代码中直接调用系统 B 和系统 C 的代码
    如果将来 D 系统接入,系统 A 还需要修改代码,过于麻烦!并且,万一系统 A、B、C 万一还改接口,还要持续跟进

02.消息队列
    将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统 A 不需要做任何修改
    所以,有了消息队列之后,从主动调用的方式,变成了消息的订阅发布( 或者说,事件的发布和监听 ),从而解耦

03.实际场景
    用户支付订单完成后,系统需要给用户发红包、增加积分等等行为,就可以通过这样的方式进行解耦

2.3 [2]异步处理

01.传统模式
    A 系统需要串行逐个同步调用系统 B、C、D,这其中会有很多问题
    如果每个系统调用执行是 200ms ,那么这个逻辑就要执行 600ms ,非常慢
    如果任一一个系统调用异常报错,那么整个逻辑就报错了
    如果任一一个系统调用超时,那么整个逻辑就超时了

02.消息队列
    通过发送 3 条 MQ 消息,通过 Consumer 消费,从而异步、并行调用系统 B、C、D
    因为发送 MQ 消息是比较快的,假设每个操作 2 ms ,那么这个逻辑只要执行 6 ms ,非常快
    当然,有胖友会有,可能发送 MQ 消息会失败。当然,这个是会存在的,此时可以异步重试
    当然,可能异步重试的过程中,JVM 进程挂了,此时又需要其他的机制来保证
    不过,相比串行逐个同步调用系统 B、C、D 来说,出错的几率会低很多很多
    另外,使用消息队列进行异步处理,会有一个前提,返回的结果不依赖于处理的结果

2.4 [3]流量消峰

01.传统模式
    对于大多数系统,一定会有访问量的波峰和波谷。比较明显的,就是我们经常使用的美团外卖,又或者被人诟病的小米秒杀
    如果在并发量大的时间,所有的请求直接打到数据库,造成数据库直接挂掉

02.消息队列
    通过将请求先转发到消息队列中
    然后,系统 A 慢慢的按照数据库能处理的并发量,从消息队列中逐步拉取消息进行消费
    在生产中,这个短暂的高峰期积压是允许的,相比把数据库打挂来说
    当然,可能有胖友说,访问量这么大,不会把消息队列给打挂么?
    相比来说,消息队列的性能会比数据库性能更好,并且,横向的扩展能力更强

2.5 [4]消息通信

01.消息通讯
    消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯
    比如实现:1.IM 聊天;2.点对点消息队列;3.面向物联网的MQTT

2.6 [5]日志处理

01.日志处理,是指将消息队列用在日志处理中,比如 Kafka 的应用,解决大量日志传输的问题
    日志采集客户端,负责日志数据采集,定时批量写入 Kafka 队列
    Kafka 消息队列,负责日志数据的接收,存储和转发
    日志处理应用:订阅并消费 Kafka 队列中的日志数据

02.ELK+Kafka日志方案
    Kafka:接收用户日志的消息队列
    Logstash:对接 Kafka 写入的日志,做日志解析,统一成 JSON 输出给 Elasticsearch 中
    Elasticsearch:实时日志分析服务的核心技术,一个 schemaless ,实时的数据存储服务,通过 index 组织数据,兼具强大的搜索和统计功能
    Kibana:基于 Elasticsearch 的数据可视化组件,超强的数据可视化能力是众多公司选择 ELK stack 的重要原因

3 消息顺序性

3.1 汇总

00.通用
    a.原因
        1.一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误
        2.一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误
    b.解决
        1.解决消费顺序的问题,通常就是一个队列只有一个消费者
        2.解决一个quque一个consumer异步处理的顺序问题;一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理

01.RocketMQ或RabbitMQ
    a.图示
        特性                       RocketMQ                          RabbitMQ
        全局顺序性                 通过单队列 + 单消费者实现           通过单队列 + 单消费者实现
        分区顺序性(局部顺序性)    通过 MessageQueueSelector 实现     通过将消息发送到特定队列实现
        消息重试对顺序的影响        可能破坏顺序,需要额外处理          可能破坏顺序,需要额外处理
        并发消费对顺序的影响        需要确保每个队列只有一个消费者      需要确保每个队列只有一个消费者
    b.说明
        1.单一队列顺序消费:将所有需要按序处理的消息发送到同一个队列中,然后只使用一个消费者来消费队列中的消息。这样可以确保消息按照发送的顺序被消费,但会造成系统的可扩展性和性能瓶颈
        2.多个队列顺序消费:根据业务逻辑将消息分发到多个队列中,每个队列对应一个消费者。消费者按照队列的顺序依次消费消息,并在消费完成后发送确认消息,这样可以达到大部分情况下的顺序处理要求
        3.消息标识和重排序:在消息的属性中添加一个消息标识,消费者在处理消息时,先根据标识进行排序,然后再进行处理。这种方式可以实现基于消息标识的顺序处理,但会增加一定的处理开销
        4.基于时间窗口的顺序处理:在生产者端根据时间戳将消息分发到不同的队列中,消费者按照队列的顺序依次消费消息。这种方式可以实现基于时间窗口的顺序处理,但对于消息的时间戳要求比较高

02.Kafka
    a.图示
        顺序性类型       实现方式                          优点                     缺点
        分区级别顺序性   使用分区路由 + 单线程消费          性能较高,Kafka 原生支持  仅限于分区内,无法保证全局顺序性
        全局顺序性       单分区方案或单消费者方案           顺序性严格               性能较低,无法充分利用 Kafka 的并行能力
        消息重试顺序性   阻塞消费、死信队列或顺序重试机制    保证顺序性               可能影响吞吐量或增加复杂性
    b.说明
        1.单分区主题
        2.使用相同的键(Key)
        3.消费者的顺序处理
    c.重点
        跨分区顺序:无顺序保证  Kafka 默认不支持全局顺序性
        分区内顺序:保证顺序性  Kafka 天然支持分区级别的顺序性。只需要确保:生产者(将同一业务逻辑的消息发送到同一个分区)、消费者(每个分区由一个消费者线程顺序消费)
        解决方案:通过控制分区数、消息键分配和消费者处理策略可以增强顺序控制

03.MySQL表
    1.利用 MySQL 的事务和排序能力,严格保证消息的顺序性
    2.通过状态字段(如 status)来标记消息的消费状态,避免重复消费
    3.结合事务和行锁机制,防止并发消费导致的顺序错

3.2 产生原因

01.分区或队列的并行消费
    消息被分散到多个分区或队列中,消费者并行消费,可能导致顺序错乱

02.消费者的并发处理
    即使消息在同一个队列中,多个消费者并发处理也可能导致顺序错乱

03.消息重试机制
    如果某条消息消费失败并重试,可能会导致后续消息被提前消费

3.3 Kafka:分区机制+单消费者线程

00.汇总
    Kafka天然支持单分区级别的顺序性,默认不支持全局顺序性
    生产者端:【分区机制、分区键】确保相关消息发送到同一个分区
    消费者端:【每个分区只有一个线程处理】

01.Kafka
    a.定义
        消息顺序性:Kafka保证在同一个分区内,消息是按照生产者发送的顺序进行存储和消费的
    b.原理
        分区机制:Kafka通过分区机制保证消息在同一个分区内的顺序性。生产者可以通过指定分区键来确保相关消息被发送到同一个分区
        单消费者线程:在消费时,确保每个分区只有一个消费者线程处理,以保证消息的消费顺序
    c.常用API
        KafkaProducer:用于发送消息
        KafkaConsumer:用于消费消息
    d.使用步骤
        在生产者端,使用分区键确保相关消息被发送到同一个分区
        在消费者端,确保每个分区只有一个消费者线程处理
    e.代码示例
        a.生产者代码
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            String key = "order123"; // 分区键,确保同一订单的消息发送到同一个分区
            ProducerRecord<String, String> record = new ProducerRecord<>("topic", key, "Order Created");

            producer.send(record);
            producer.close();
        b.消费者代码
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("topic"));

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        // 处理消息,保证每个分区只有一个线程处理
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    }
                }
            } finally {
                consumer.close();
            }

02.Kafka
    a.总结
        a.说明
            Kafka 中的消息顺序取决于分区级别。在特定的分区内,消息是以严格的顺序存储和消费的,但跨分区时顺序无法保证
            kafka 中的每个 partition 中的消息在写入时都是有序的,而且单独一个 partition 只能由一个消费者去消费,可以在里面保证消息的顺序性。但是分区之间的消息是不保证有序的
        b.重点
            分区内顺序:保证顺序性
            跨分区顺序:无顺序保证
            解决方案:通过控制分区数、消息键分配和消费者处理策略可以增强顺序控制
    b.背景
        a.分区级别的顺序性
            Kafka 保证同一个分区(Partition)内的消息是按照生产顺序存储的
            消费者在消费同一个分区时,消息是按照存储顺序读取的
        b.全局顺序性
            Kafka 不保证全局顺序性,因为消息会被分布到多个分区中,不同分区之间的消息顺序无法保证
        c.并发消费的影响
            如果多个消费者并发消费同一个分区,可能会导致消息顺序被破坏
        d.消息重试的影响:
            如果某条消息消费失败并重试,可能会导致后续消息被提前消费,从而破坏顺序性
    c.Kafka消息顺序的行为
        a.分区内的顺序
            顺序性:在同一个分区内,消息按照它们发送的顺序存储和消费。生产者发送的消息会被追加到分区的末尾,因此同一分区内的消息顺序是保证的
            消费顺序:消费者从分区读取消息时,会按照消息的偏移量(offset)顺序进行,因此保证消息的消费顺序与发送顺序一致
        b.跨分区的顺序
            无顺序保证:如果一个主题有多个分区,消息可能会分布到不同的分区中,因此跨分区的消息顺序无法保证
            消息顺序混乱:在多分区的场景中,消费者可能会从不同的分区并发读取消息,这会导致全局的消息顺序不再严格
    d.如何确保消息顺序?
        a.单分区主题
            使用只有一个分区的主题,这样所有消息都进入同一个分区,从而保证顺序。但是,这种方式限制了 Kafka 的吞吐量和并行处理能力,不适用于高吞吐需求的场景
        b.使用相同的键(Key)
            当使用带有键的消息时,Kafka 的默认分区器会根据键进行哈希,确保相同键的消息进入同一个分区。这可以确保对于相同键的消息顺序性
            例如,订单系统可以使用订单 ID 作为键,这样同一个订单的所有操作消息都会进入同一个分区,从而保证该订单相关消息的顺序
        c.消费者的顺序处理
            在消费端确保顺序也很关键。即使分区内消息是顺序的,如果消费者并行处理,顺序仍然可能被打乱。因此,确保单线程或顺序处理是必要的
    e.如何确保消息顺序?
        a.Kafka天然支持单分区级别的顺序性
            生产者端:通过指定分区(Partition)或使用自定义分区器(Partitioner),将同一业务逻辑的消息路由到同一个分区
            消费者端:确保每个分区只有一个消费者线程消费
        b.Kafka默认不支持全局顺序性
            a.单分区方案:
                将所有消息发送到同一个分区
                由于 Kafka 保证分区内的顺序性,因此可以实现全局顺序性
                缺点:单分区会限制 Kafka 的吞吐量,无法充分利用 Kafka 的分布式能力
            b.单消费者方案
                即使消息分布在多个分区中,使用单个消费者线程按顺序消费所有分区的消息
                缺点:无法并行消费,性能较低
            c.实现步骤
                单分区方案:在生产者端,所有消息发送到同一个分区(指定分区号)
                单消费者方案:在消费者端,使用单线程消费所有分区的消息,并根据消息的时间戳或业务字段进行排序
    f.消息重试的顺序性问题(如果某条消息消费失败并重试,可能会导致后续消息被提前消费,从而破坏顺序性)
        a.阻塞式消费
            如果某条消息消费失败,阻塞当前线程,直到消费成功
            缺点:可能导致消费停滞,影响吞吐量
        b.死信队列(DLQ)
            如果某条消息消费失败,将其发送到死信队列(Dead Letter Queue),继续消费后续消息
            缺点:需要额外处理死信队列中的消息
        c.顺序重试机制
            将消费失败的消息重新插入到 Kafka 中,并确保其重新消费的顺序

3.4 RabbitMQ:队列机制+单消费者线程

00.汇总
    RabbitMQ的队列内顺序是天然保证的,因为RabbitMQ的队列是FIFO(先进先出)
    生产者端:【队列机制】确保消息按照顺序发送到队列
    消费者端:【每个队列只有一个线程处理】

01.RabbitMQ
    a.定义
        消息顺序性:RabbitMQ保证在同一个队列内,消息是按照生产者发送的顺序进行存储和消费的
    b.原理
        队列机制:RabbitMQ通过队列机制保证消息的顺序性。生产者发送的消息按照顺序进入队列,消费者按照顺序消费
        单消费者线程:在消费时,确保每个队列只有一个消费者线程处理,以保证消息的消费顺序
    c.常用API
        Channel:用于发送和消费消息
    d.使用步骤
        在生产者端,确保消息按照顺序发送到队列
        在消费者端,确保每个队列只有一个消费者线程处理
    e.代码示例
        a.生产者代码
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                String message = "Order Created";
                channel.basicPublish("", "queue_name", null, message.getBytes());
            }
        b.消费者代码
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    // 处理消息,保证每个队列只有一个线程处理
                    System.out.println("Received: " + message);
                };
                channel.basicConsume("queue_name", true, deliverCallback, consumerTag -> {});
            }

02.RabbitMQ
    a.消息顺序性
        全局顺序:所有消息按照生产顺序被消费
        队列内顺序(局部顺序):同一个队列内的消息按照生产顺序被消费
    b.保证顺序性的实现
        a.全局顺序
            RabbitMQ的全局顺序需要确保所有消息发送到同一个队列,并且只有一个消费者消费该队列
            在生产者端,所有消息发送到同一个队列
            在消费者端,设置单线程消费,确保消息按顺序处理
        b.队列内顺序(局部顺序)
            RabbitMQ 的队列内顺序是天然保证的,因为 RabbitMQ 的队列是 FIFO(先进先出)
            同一个业务逻辑的消息发送到同一个队列
            消费者单线程消费该队列
    c.三种方式
        a.单一队列
            描述: 保证所有消息通过单一队列传递,消费者按顺序处理队列中的消息
            优点: 简单直接,消息按发送顺序消费
            缺点: 单一队列可能成为性能瓶颈,无法横向扩展
        b.Publisher Confirms
            描述: 使用 Publisher Confirms 机制,确保消息按顺序发布,并在消费者确认消费后再发布下一条消息
            优点: 提高消息投递的可靠性
            缺点: 增加了发布消息的复杂度
        c.使用分区键 (Routing Key)
            描述: 根据消息的某个属性(如用户 ID)进行分区,通过不同的 Routing Key 发送到不同的队列中,确保同一分区内的消息顺序
            优点: 可以一定程度上提高系统的并发能力
            缺点: 需要合理设计分区策略,确保负载均衡
    d.原理
        拆分多个 Queue,每个 Queue一个 Consumer
        或者就一个 Queue 但是对应一个 Consumer,然后这个 Consumer 内部用内存队列做排队,然后分发给底层不同的 Worker 来处理

3.5 MySQL表:字段+事务+行锁

01.总结
    1.利用 MySQL 的事务和排序能力,严格保证消息的顺序性
    2.通过状态字段(如 status)来标记消息的消费状态,避免重复消费
    3.结合事务和行锁机制,防止并发消费导致的顺序错乱

02.思路
    a.核心
        将消息存储到 MySQL 表中,利用数据库的事务和排序能力来保证消息的顺序性
        消费者从 MySQL 表中按顺序读取消息并处理,确保消费顺序与生产顺序一致
    b.场景
        消息队列本身无法很好地保证顺序性
        消息的生产和消费速率较低,MySQL 的性能可以满足需求
        对顺序性要求非常高,不能容忍任何顺序错乱
    c.优点
        利用 MySQL 的事务和排序能力,顺序性可以得到严格保证
        消息的持久化能力更强,数据不会因为消息队列的丢失而丢失
    d.缺点
        性能较低,MySQL 的吞吐量远不如消息队列
        需要额外的开发和维护成本

03.实现步骤
    a.创建MySQL表
        CREATE TABLE message_queue (
            id BIGINT AUTO_INCREMENT PRIMARY KEY, -- 自增主键,用于保证顺序
            topic VARCHAR(255) NOT NULL,          -- 消息主题
            message_body TEXT NOT NULL,           -- 消息内容
            status TINYINT NOT NULL DEFAULT 0,    -- 消息状态:0-未消费,1-已消费
            create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP -- 消息创建时间
        );
        -----------------------------------------------------------------------------------------------------
        id:自增主键,用于保证消息的顺序
        topic:消息的主题,用于区分不同的业务场景
        message_body:消息的内容,存储实际的业务数据
        status:消息的消费状态,0 表示未消费,1 表示已消费
        create_time:消息的创建时间,用于记录消息的生成时间
    b.生产者将消息写入 MySQL 表:生产者在发送消息时,将消息插入到 MySQL 表中
        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.PreparedStatement;

        public class Producer {
            public static void main(String[] args) throws Exception {
                // 数据库连接
                String url = "jdbc:mysql://localhost:3306/test";
                String user = "root";
                String password = "password";
                Connection connection = DriverManager.getConnection(url, user, password);

                // 插入消息
                String sql = "INSERT INTO message_queue (topic, message_body) VALUES (?, ?)";
                PreparedStatement statement = connection.prepareStatement(sql);

                for (int i = 1; i <= 10; i++) {
                    statement.setString(1, "order_topic");
                    statement.setString(2, "Message " + i);
                    statement.executeUpdate();
                }

                statement.close();
                connection.close();
            }
        }
    c.消费者按顺序读取消息:消费者从 MySQL 表中按顺序读取未消费的消息,并更新消息状态为已消费
        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.PreparedStatement;
        import java.sql.ResultSet;

        public class Consumer {
            public static void main(String[] args) throws Exception {
                // 数据库连接
                String url = "jdbc:mysql://localhost:3306/test";
                String user = "root";
                String password = "password";
                Connection connection = DriverManager.getConnection(url, user, password);

                while (true) {
                    // 查询未消费的消息,按 id 顺序读取
                    String selectSql = "SELECT id, message_body FROM message_queue WHERE status = 0 ORDER BY id ASC LIMIT 1";
                    PreparedStatement selectStatement = connection.prepareStatement(selectSql);
                    ResultSet resultSet = selectStatement.executeQuery();

                    if (resultSet.next()) {
                        long messageId = resultSet.getLong("id");
                        String messageBody = resultSet.getString("message_body");

                        // 处理消息
                        System.out.println("Processing message: " + messageBody);

                        // 更新消息状态为已消费
                        String updateSql = "UPDATE message_queue SET status = 1 WHERE id = ?";
                        PreparedStatement updateStatement = connection.prepareStatement(updateSql);
                        updateStatement.setLong(1, messageId);
                        updateStatement.executeUpdate();
                        updateStatement.close();
                    } else {
                        // 如果没有未消费的消息,休眠一段时间
                        Thread.sleep(1000);
                    }

                    resultSet.close();
                    selectStatement.close();
                }
            }
        }
    d.事务和并发控制
        事务:在消费者读取和更新消息状态时,使用事务来保证操作的原子性,避免并发消费导致的顺序错乱
        并发控制:可以通过数据库的行锁机制(如 FOR UPDATE)来防止多个消费者同时消费同一条消息
    e.示例(使用事务和行锁)
        START TRANSACTION;
        -- 查询未消费的消息并加锁
        SELECT id, message_body FROM message_queue WHERE status = 0 ORDER BY id ASC LIMIT 1 FOR UPDATE;
        -- 更新消息状态为已消费
        UPDATE message_queue SET status = 1 WHERE id = ?;
        COMMIT;

4 消息积压

4.1 判断

01.如何得知MQ出现了消息堆积?
    a.消息总共有三种状态
        已就绪、处理中、处理完毕
    b.结论
        消息堆积量(已就绪+ 处理中)来度量消费健康度,表示未处理完成的消息量

02.此时不能认为出现了「消息堆积」问题
    a.说明
        短时间内 MQ 接收到了大量消息,导致数量上达到了「报警阈值」
        而实际下游消费速度足够给力,大量消息依旧可以在短时间内都消费完毕。
    b.解决
        可以使用「延时时间」来度量消费健康度,可以涵盖所有业务场景:
        已就绪消息排队时间:表示拉取消息及时度
        消息处理延迟时间:表示业务处理完成及时度

4.2 原因:3个

00.确定哪一部分出现了「消息堆积」问题?
    定位问题的前提是上面的「监控和告警」系统已经搭建,这样才有数据对以下内容判断

01.生产端堆积:生产者发送速度 > Broker 处理能力。消息没有被及时发送
    生产者发送速率异常飙升(出现了热点,比如某个明星结婚)
    网络问题,导致消息发送延迟或失败,导致频繁重试,拉长发送时间

02.Broker堆积:Broker 存储能力不足。无法及时接收、存储、投递消息
    Broker 写入性能下降(如磁盘满、CPU 过载)
    Broker 节点故障或主从同步延迟
    Topic 队列数不足,导致消息写入/消费并行度低

03.消费端堆积:消费者处理速度 < Broker 投递速度。消费没有被及时消费
    a.消费者处理逻辑变慢
        如依赖的外部服务延迟增加,如代码内部死锁、外部的 RPC 调用、数据库读写等;没有使用批量消费加快消费速度
    b.消费者线程池配置不合理、消费者节点太少
        资源没给够。比如 99 买了个一年 2c2g 的服务器,拿来消费百万条消息,等服务器过期了也消费不完。本质是资源没给够
    c.消息处理失败导致重复投递
        如代码异常或消息格式错误

4.3 阶段:3个

00.汇总
    消息生产阶段
    消息存储阶段
    消息消费阶段(消息堆积)

01.Kafka
    a.消息生产阶段
        a.定义
            生产者将消息发送到Kafka集群的过程
        b.机制
            幂等性:通过配置enable.idempotence=true,确保每条消息在主题分区中只写入一次
            批量发送:通过linger.ms和batch.size配置,优化消息发送的批量处理
        c.常用API
            KafkaProducer:用于发送消息
        d.代码示例
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
            producer.send(record);
            producer.close();
    b.消息存储阶段
        a.定义
            消息在Kafka集群中存储的过程
        b.机制
            分区:消息存储在主题的分区中,分区是Kafka的并行处理单元
            副本机制:通过配置副本因子(replication factor),确保消息的高可用性
        c.常用API
            AdminClient:用于管理主题和分区
        d.代码示例
            // 创建主题时指定分区和副本因子
            NewTopic newTopic = new NewTopic("topic", 3, (short) 2);
            AdminClient adminClient = AdminClient.create(props);
            adminClient.createTopics(Collections.singletonList(newTopic));
    c.消息消费阶段(消息堆积)
        a.定义
            消费者从Kafka集群中拉取消息的过程
        b.机制
            消费组:消费者可以组成消费组,确保每条消息只被消费组中的一个消费者处理
            偏移量管理:通过自动或手动提交偏移量,管理消费进度
        c.常用API
            KafkaConsumer:用于消费消息
        d.代码示例
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("topic"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 处理消息
                }
            }

02.RabbitMQ
    a.消息生产阶段
        a.定义
            生产者将消息发送到RabbitMQ的过程
        b.机制
            消息确认:生产者可以启用消息确认机制,确保消息被成功接收
        c.常用API
            Channel.basicPublish:用于发送消息
        d.代码示例
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.confirmSelect(); // 启用确认模式
                String message = "Hello World!";
                channel.basicPublish("", "queue_name", null, message.getBytes());
                channel.waitForConfirmsOrDie(); // 等待确认
            }
    b.消息存储阶段
        a.定义
            消息在RabbitMQ中存储的过程
        b.机制
            持久化:通过设置消息和队列的持久化属性,确保消息在RabbitMQ重启后不会丢失
        c.常用API
            Channel.queueDeclare:用于声明队列
        d.代码示例
            channel.queueDeclare("queue_name", true, false, false, null); // 队列持久化
   c.消息消费阶段(消息堆积)
        a.定义
            消费者从RabbitMQ中消费消息的过程
        b.机制
            手动确认:消费者可以手动确认消息,确保消息被成功处理
            消费限流:通过basicQos设置预取计数,限制消费者的消费速率
        c.常用API
            Channel.basicConsume:用于消费消息
        d.代码示例
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.basicQos(10); // 每次最多处理10条消息
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    // 处理消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动确认
                };
                channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> {});
            }

4.4 预防:4点

01.监控和告警
    及时感知「消息堆积」问题

02.保证消费者拥有满足常规流量的消费能力
    a.确定单点消费者能力和消费者数量,使消费者拥有合理的资源
        通过压测单个节点,并逐步调大线程的单个节点的线程数,观测节点的系统指标
        得到单个节点最优的消费线程数和消息吞吐量。并根据上下游链路的流量峰值
        计算出需要设置的节点数。节点数 = 流量峰值 / 单节点消息吞吐量
    b.提高单点消费性能
        可以通过调整消费者线程数、批量消费、排查耗时操作优化代码逻辑、异步处理耗时操作

03.需要具备应对突发流量的能力,当发现「消息堆积」时,及时对 MQ 和消费者「动态扩容」
    1.发现消息堆积,手动扩容
    2.Kubernetes HPA、云产商自动扩容

04.资源隔离,风险隔离。不能因为某个业务的「消息对接」导致整个系统不可用
    1.物理隔离:核心业务和非核心业务,分别用各自的 MQ
    2.逻辑隔离:在同一 MQ 中,通过 Namespace、Topic/Queue、Consumer Group 角度进行逻辑隔离

4.5 解决:2种

00.汇总
    a.批量消费
        RocketMQ提供的批处理
        本地缓冲聚合
    b.消费者负载均衡
        队列粒度
        消息粒度

01.批量消费
    a.场景
        a.说明
            通过批量消费,可以降低网络交互次数和带宽消耗,并聚合写操作
            比如每次都聚合 10 次写操作,可以将 10 次锁竞争变为 1 次锁竞争,文本传输的数据量也降低至 10%,那性能理论上可以提升 10 倍
        b.比如合并UPDATE操作,如下所示
            UPDATE t_video_count SET like_count = like_count + 1 WHERE video_id = 1;
            ... 8 条 ...
            UPDATE t_video_count SET like_count = like_count + 1 WHERE video_id = 1;
            -- 将以上 10 条聚合为 1 条
            UPDATE t_video_count SET like_count = like_count + 10 WHERE video_id = 1;
    b.RocketMQ提供的批处理
        a.说明
            消费者可以使用 setConsumeMessageBatchMaxSize和 MessageListenerConcurrently接口实现批量消费
            如果我们先开启消费者,再开启生产者。会发现消费者还是一条条拉取,而不是批量拉取,因为消费者并没有等待一会攒数据,刚发来的消息立马就消费完毕了(此时几乎等价 RPC 调用)
            所以可以先开启生产者,向 MQ 发送消息,攒一波消息,再开启消费者批量消费。或者生产者采用批量发送,一次性将消息都发送出去
        b.缺点
            存在问题
            重试粒度大。如果批量消费中某条消息处理失败,默认整个批次会重试
            消费过于及时,MQ 中没有足够多的消息给消费者批量拉取。不过现在出现了「消息堆积」,这就不是问题了
        c.生产者代码
            import org.apache.rocketmq.client.producer.DefaultMQProducer;
            import org.apache.rocketmq.client.producer.SendResult;
            import org.apache.rocketmq.common.message.Message;

            import java.util.ArrayList;
            import java.util.List;

            public class BatchMessageProducer {
                public static void main(String[] args) throws Exception {
                    DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup");
                    producer.setNamesrvAddr("localhost:9876");
                    producer.start();

                    try {
                        List<Message> messageList = new ArrayList<>();

                        // 创建多条消息
                        for (int i = 0; i < 10; i++) {
                            String content = "Batch message " + i;
                            Message msg = new Message(
                                    "BatchTopic",    // topic
                                    "TagA",         // tag
                                    content.getBytes() // 消息体
                            );
                            messageList.add(msg);
                        }

                        // 发送批量消息
                        SendResult sendResult = producer.send(messageList);
                        System.out.printf("Batch messages sent successfully, result: %s%n", sendResult);

                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        producer.shutdown();
                    }
                }
            }
        c.消费者代码
            import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
            import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
            import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
            import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
            import org.apache.rocketmq.common.message.MessageExt;
            import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

            import java.util.List;

            public class BatchMessageConsumer {
                public static void main(String[] args) throws Exception {
                    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchConsumerGroup");
                    consumer.setNamesrvAddr("localhost:9876");
                    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

                    // 设置每次批量消费的最大消息数量
                    consumer.setConsumeMessageBatchMaxSize(10);
                    consumer.subscribe("BatchTopic", "*");
                    consumer.registerMessageListener(new MessageListenerConcurrently() {
                        @Override
                        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                        ConsumeConcurrentlyContext context) {
                            System.out.printf("Batch consuming %d messages%n", msgs.size());

                            for (MessageExt msg : msgs) {
                                String content = new String(msg.getBody());
                                System.out.printf("Consumed message: %s%n", content);
                            }
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    });

                    consumer.start();
                    System.out.println("Batch message consumer started.");
                }
            }
    c.本地缓冲聚合
        a.说明
            先把数据往容器里丢,直接 ACK 返回给 MQ。等到满足一定的数量或一定的时间时
            再批量消费,执行实际的业务逻辑
        b.优点
            将耗时的业务逻辑后置(异步化),尽可能快的 ACK 给 MQ
            聚合多个写请求,通过批量写入减少写入次数,提高写入性能
            该实现方案不依赖特定的 MQ,具有通用性
        c.缺点
            一致性问题。如果聚合时宕机,内存的数据都将丢失。因为消息已经 ACK,MQ 不会重新投递。需要考虑异常崩溃时的一致性问题。常见的一致性解决方案就是使用定时任务对账
            额外的写入延迟,适用于对延迟不敏感的场景
        d.可以通过快手开源的 BufferTrigger 实现聚合操作
            a.依赖
                <dependency>
                    <groupId>com.github.phantomthief</groupId>
                    <artifactId>buffer-trigger</artifactId>
                    <version>0.2.21</version>
                </dependency>
            b.生产者代码
                public class LikeMessageProducer {
                    public static void main(String[] args) throws Exception {
                        DefaultMQProducer producer = new DefaultMQProducer("LikeProducerGroup");
                        producer.setNamesrvAddr("localhost:9876");
                        producer.start();

                        try {
                            // 模拟发送多条点赞消息
                            for (int i = 0; i < 10; i++) {
                                // 模拟对视频1和视频2的点赞
                                Long videoId = (i % 2) + 1L;
                                Message msg = new Message(
                                        "LikeTopic",
                                        "TagA",
                                        videoId.toString().getBytes()
                                );
                                producer.sendOneway(msg);
                                System.out.printf("Sent like message for video %d%n", videoId);
                                Thread.sleep(10);  // 短暂延迟,模拟真实场景
                            }
                        } finally {
                            producer.shutdown();
                        }
                    }
                }
            c.消费者代码
                public class LikeMessageConsumer {
                    private final DefaultMQPushConsumer consumer;
                    private final BufferTrigger<LikeOperation> bufferTrigger;

                    // 点赞操作实体类
                    private static class LikeOperation {
                        private final Long videoId;
                        private final int delta;

                        public LikeOperation(Long videoId, int delta) {
                            this.videoId = videoId;
                            this.delta = delta;
                        }

                        @Override
                        public boolean equals(Object o) {
                            if (this == o) return true;
                            if (o == null || getClass() != o.getClass()) return false;
                            LikeOperation that = (LikeOperation) o;
                            return Objects.equals(videoId, that.videoId);
                        }

                        @Override
                        public int hashCode() {
                            return Objects.hash(videoId);
                        }
                    }

                    public LikeMessageConsumer(String consumerGroup, String namesrvAddr, String topic) {
                        // 创建正确的 BufferTrigger
                        this.bufferTrigger = BufferTrigger.<LikeOperation, ConcurrentHashMap<LikeOperation, AtomicInteger>>simple()
                                .maxBufferCount(100)  // 设置最大缓冲数量
                                .interval(500, TimeUnit.MILLISECONDS)  // 设置时间间隔
                                .setContainer(ConcurrentHashMap::new, (map, operation) -> {
                                    // 使用 computeIfAbsent 确保线程安全地更新计数
                                    map.computeIfAbsent(operation, k -> new AtomicInteger())
                                            .addAndGet(operation.delta);
                                    return true;
                                })
                                .consumer(this::processLikeBatch)
                                .build();

                        // 创建消费者
                        this.consumer = new DefaultMQPushConsumer(consumerGroup);
                        consumer.setNamesrvAddr(namesrvAddr);
                        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

                        try {
                            consumer.subscribe(topic, "*");
                            consumer.registerMessageListener(new MessageListenerConcurrently() {
                                @Override
                                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                                ConsumeConcurrentlyContext context) {
                                    for (MessageExt msg : msgs) {
                                        try {
                                            // 假设消息体就是视频ID
                                            Long videoId = Long.parseLong(new String(msg.getBody()));
                                            // 将消息放入 BufferTrigger
                                            bufferTrigger.enqueue(new LikeOperation(videoId, 1));
                                        } catch (Exception e) {
                                            System.err.println("Error processing message: " + e.getMessage());
                                        }
                                    }
                                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                }
                            });
                        } catch (Exception e) {
                            throw new RuntimeException("Failed to initialize consumer", e);
                        }
                    }

                    // 批量处理点赞
                    private void processLikeBatch(ConcurrentHashMap<LikeOperation, AtomicInteger> aggregatedMap) {
                        if (aggregatedMap.isEmpty()) {
                            return;
                        }

                        // 处理聚合后的结果
                        for (Map.Entry<LikeOperation, AtomicInteger> entry : aggregatedMap.entrySet()) {
                            LikeOperation operation = entry.getKey();
                            int totalDelta = entry.getValue().get();

                            System.out.printf("[%s] 执行聚合更新: UPDATE t_video_count SET like_count = like_count + %d WHERE video_id = %d%n",
                                    getCurrentTime(), totalDelta, operation.videoId);

                            // TODO: 在这里添加实际的数据库更新代码
                            // jdbcTemplate.update(
                            //     "UPDATE t_video_count SET like_count = like_count + ? WHERE video_id = ?",
                            //     totalDelta, operation.videoId
                            // );
                        }
                    }

                    private String getCurrentTime() {
                        return java.time.LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                    }

                    public void start() throws Exception {
                        consumer.start();
                        System.out.println("Consumer started.");
                    }

                    public void shutdown() {
                        bufferTrigger.close();
                        consumer.shutdown();
                    }

                    public static void main(String[] args) throws Exception {
                        LikeMessageConsumer consumer = new LikeMessageConsumer(
                                "LikeConsumerGroup",
                                "localhost:9876",
                                "LikeTopic"
                        );
                        consumer.start();

                        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                            consumer.shutdown();
                            System.out.println("Consumer has been shutdown.");
                        }));
                    }
                }

02.消费者负载均衡
    a.场景
        添加节点并不是在所有情况下,都一定能提高消费的吞吐量的,需要考虑「消费者负载均衡」的影响
    b.队列粒度
        a.说明
            在 RocketMQ 4.x 中,默认采用「队列粒度」负载均衡
        b.特点
            每个队列仅被一个消费者消费,队列和消费者的关系为多对一
        c.说明
            主题中的三个队列 Queue1、Queue2、Queue3 被分配给消费者分组中的两个消费者
            每个队列只能分配给一个消费者消费。图中由于队列数 > 消费者数
            因此,消费者 A2 被分配了两个队列。若队列数 < 消费者数量,可能会出现部分消费者无绑定队列的情况
            这也就导致了当队列数 < 消费者数量时,继续添加消费者节点无法提高消费的吞吐量
    c.消息粒度
        a.说明
            在 RocketMQ 5.x 中,默认采用「消息粒度」负载均衡
        b.特点
            a.消费分摊更均衡
                对于传统队列级的负载均衡策略,如果队列数量和消费者数量不均衡
                则可能会出现部分消费者空闲,或部分消费者处理过多消息的情况
                消息粒度负载均衡策略无需关注消费者和队列的相对数量,能够更均匀地分摊消息
            b.对非对等消费者更友好
                在线上生产环境中,由于网络机房分区延迟、消费者物理资源规格不一致等原因
                消费者的处理能力可能会不一致,如果按照队列分配消息则可能出现部分消费者消息堆积、部分消费者空闲的情况
                消息粒度负载均衡策略按需分配,消费者处理任务更均衡
            c.队列分配运维更方便
                传统基于绑定队列的负载均衡策略必须保证队列数量大于等于消费者数量
                以免产生部分消费者获取不到队列出现空转的情况,而消息粒度负载均衡策略则无需关注队列数
        c.总结
            不需要保证队列数 >= 消费者数量,它们的数量对应关系是多对多的,可以是任意的相对数量
            所以,继续添加消费者节点可以继续提高消费的吞吐量

4.6 解决:提高配置

01.Kafka
    a.定义
        当消费者处理消息的速度低于生产者发送消息的速度时,消息会在Kafka的分区中积压
    b.原理
        增加消费者:通过增加消费者实例来提高消费能力
        优化消费者性能:调整消费者的处理逻辑,提高消费效率
        分区重分配:增加分区数量以提高并行消费能力
    c.常用API
        KafkaConsumer:用于消费消息
        AdminClient:用于管理Kafka集群(如增加分区)
    d.使用步骤
        监控消费延迟,识别积压问题
        增加消费者实例或优化消费者性能
        使用AdminClient增加分区数量(如有必要)
    e.代码示例
        // 增加消费者实例
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 优化处理逻辑
            }
        }

02.RabbitMQ
    a.定义
        当消费者处理消息的速度低于生产者发送消息的速度时,消息会在RabbitMQ的队列中积压
    b.原理
        增加消费者:通过增加消费者实例来提高消费能力
        优化消费者性能:调整消费者的处理逻辑,提高消费效率
        队列分片:将消息分配到多个队列中以提高并行消费能力
    c.常用API
        Channel:用于消费消息
        ConnectionFactory:用于创建连接和通道
    d.使用步骤
        监控队列长度,识别积压问题
        增加消费者实例或优化消费者性能
        使用多个队列进行分片
    e.代码示例
        // 增加消费者实例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.basicQos(10); // 每次最多处理10条消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 优化处理逻辑
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动确认
            };
            channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> {});
        }

4.7 原因:消费方崩溃

00.汇总
    a.Kafka
        手动提交偏移量
        通过【消息持久化、消费组重平衡、手动提交偏移量】来处理消息堆积问题,消费者在恢复上线后会从上次提交的偏移量继续消费
    b.RabbitMQ
        手动确认和预取计数、手动确认消息
        通过【消息持久化、手动确认、消费限流】来处理消息堆积问题,消费者在恢复上线后会从未确认的消息继续消费

01.Kafka
    a.定义
        消息堆积:由于消费方服务器崩溃,导致Kafka分区中的消息无法及时被消费,造成消息堆积
    b.原理
        消息持久化:Kafka将消息持久化到磁盘,确保消息在消费方服务器恢复后仍然可用
        消费组重平衡:当消费方服务器恢复上线后,Kafka会自动进行消费组重平衡,将分区重新分配给可用的消费者
        手动提交偏移量:消费者可以手动提交偏移量,确保在崩溃恢复后从正确的位置继续消费
    c.常用API
        KafkaConsumer:用于消费消息
        ConsumerConfig:用于配置消费者属性
        AdminClient:用于管理Kafka集群
    d.使用步骤
        配置消费者属性,启用手动提交偏移量
        使用KafkaConsumer消费消息,并手动提交偏移量
        当消费方服务器恢复上线后,Kafka会自动进行消费组重平衡,消费者从上次提交的偏移量继续消费
    e.消费者代码
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交偏移量

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 处理消息
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                consumer.commitSync(); // 手动提交偏移量
            }
        } finally {
            consumer.close();
        }

02.RabbitMQ
    a.定义
        消息堆积:由于消费方服务器崩溃,导致Kafka分区中的消息无法及时被消费,造成消息堆积
    b.原理
        消息持久化:RabbitMQ将消息持久化到磁盘,确保消息在消费方服务器恢复后仍然可用
        手动确认:消费者可以手动确认消息,确保在崩溃恢复后从未确认的消息继续消费
        消费限流:通过设置预取计数,限制消费者在确认消息之前可以接收的最大消息数
    c.常用API
        Channel:用于消费消息
        ConnectionFactory:用于创建连接和通道
        DeliverCallback:用于处理消息
    d.使用步骤
        创建Channel并设置手动确认和预取计数
        使用basicConsume方法消费消息,并手动确认消息
        当消费方服务器恢复上线后,RabbitMQ会将未确认的消息重新分配给消费者
    e.消费者代码
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.basicQos(10); // 每次最多处理10条消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 处理消息
                System.out.println("Received: " + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动确认
            };
            channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> {});
        } catch (Exception e) {
            e.printStackTrace();
        }

5 Kafka

5.1 [1]生产可靠:幂等性+确认机制

01.生产可靠性
    a.定义
        生产可靠性:确保生产者发送的消息能够被Kafka成功接收并持久化
    b.原理
        幂等性:通过配置生产者的幂等性(enable.idempotence=true),确保每条消息在主题分区中只写入一次
        确认机制:生产者可以配置消息确认级别(acks),确保消息被成功写入Kafka
    c.常用API
        KafkaProducer:用于发送消息
        ProducerConfig:用于配置生产者属性
    d.使用步骤
        配置生产者属性,启用幂等性和设置确认级别
        使用KafkaProducer发送消息,并处理发送结果
    e.代码示例
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 设置确认级别为"all"

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.printf("Sent message to topic %s partition %d offset %d%n", metadata.topic(), metadata.partition(), metadata.offset());
            }
        });

        producer.close();

5.2 [1]消费可靠:偏移量管理+重试机制

01.消费可靠性
    a.定义
        消费可靠性:确保消费者能够成功接收并处理Kafka中的消息
    b.原理
        偏移量管理:通过自动或手动提交偏移量,确保消费者在崩溃恢复后从正确的位置继续消费
        重试机制:消费者可以实现重试机制,确保消息处理失败时能够重新处理
    c.常用API
        KafkaConsumer:用于消费消息
        ConsumerConfig:用于配置消费者属性
    d.使用步骤
        配置消费者属性,选择自动或手动提交偏移量
        使用KafkaConsumer消费消息,并实现重试机制
    e.代码示例
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交偏移量

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // 处理消息
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                        consumer.commitSync(); // 手动提交偏移量
                    } catch (Exception e) {
                        // 实现重试机制
                        e.printStackTrace();
                    }
                }
            }
        } finally {
            consumer.close();
        }

5.3 [1]生产重复:幂等性+事务机制

01.生产时消息重复
    a.定义
        生产者在发送消息时可能会因为网络问题或重试机制导致消息被多次发送
    b.原理
        幂等性:Kafka提供了生产者幂等性配置(enable.idempotence=true),确保每条消息在主题分区中只写入一次
        事务:Kafka支持事务性生产者,允许将多个消息写入操作作为一个原子操作提交
    c.常用API
        KafkaProducer:用于发送消息
        ProducerConfig:用于配置生产者属性
    d.使用步骤
        配置生产者属性,启用幂等性
        使用KafkaProducer发送消息
    e.代码示例
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
        producer.send(record);
        producer.close();

5.4 [1]消费重复:偏移量

01.消费时消息重复
    a.定义
        消费者在消费消息时可能会因为网络问题或重试机制导致消息被多次消费
    b.原理
        自动提交偏移量:消费者自动提交偏移量,确保每条消息只消费一次
        手动提交偏移量:消费者手动提交偏移量,提供更精细的控制
    c.常用API
        KafkaConsumer:用于消费消息
        ConsumerConfig:用于配置消费者属性
    d.使用步骤
        配置消费者属性,选择自动或手动提交偏移量
        使用KafkaConsumer消费消息
    e.代码示例
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交偏移量

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
            }
            consumer.commitSync(); // 手动提交偏移量
        }

5.5 [1]数据一致性:汇总

00.汇总
    通过【幂等性】、【事务性生产者】、【消费组管理】来确保数据一致性
    生产者:【幂等性】、【事务】
    消费者:【消费组管理】

01.Kafka
    a.定义
        数据一致性:确保消息在生产者发送到Kafka、存储在Kafka以及消费者消费时保持一致性
    b.原理
        幂等性生产者:通过配置生产者的幂等性(enable.idempotence=true),确保每条消息在主题分区中只写入一次
        事务性生产者:Kafka支持事务性生产者,允许将多个消息写入操作作为一个原子操作提交,确保消息的一致性
        消费组管理:通过消费组管理,确保每条消息只被消费组中的一个消费者处理
    c.常用API
        KafkaProducer:用于发送消息
        KafkaConsumer:用于消费消息
        ProducerConfig:用于配置生产者属性
    d.使用步骤
        配置生产者属性,启用幂等性和事务支持
        使用KafkaProducer发送消息,并使用事务提交
        使用KafkaConsumer消费消息,确保消费组管理
    e.代码示例
        a.生产者代码
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-id"); // 启用事务

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            producer.initTransactions();

            try {
                producer.beginTransaction();
                ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
                producer.send(record);
                producer.commitTransaction();
            } catch (Exception e) {
                producer.abortTransaction();
                e.printStackTrace();
            } finally {
                producer.close();
            }
        b.消费者代码
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("topic"));

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        // 处理消息
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    }
                }
            } finally {
                consumer.close();
            }

5.6 [2]消息过期:主题保留时间

00.汇总
    通过配置【主题保留时间】来处理消息过期问题,超过保留时间的消息会被自动删除

01.Kafka
    a.定义
        消息过期:Kafka中的消息在分区中存储超过配置的保留时间后会被自动删除
    b.原理
        日志段删除:Kafka通过配置主题的保留时间(retention.ms)来控制消息的过期时间。超过保留时间的消息所在的日志段会被自动删除
        配置保留策略:可以通过时间或日志段大小来配置消息的保留策略
    c.常用API
        AdminClient:用于管理Kafka集群和主题配置
    d.使用步骤
        使用AdminClient配置主题的保留时间
        Kafka会自动删除超过保留时间的消息
    e.代码示例
        a.配置主题保留时间
            Properties props = new Properties();
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

            try (AdminClient adminClient = AdminClient.create(props)) {
                Map<String, String> configs = new HashMap<>();
                configs.put("retention.ms", "604800000"); // 设置保留时间为7天(单位为毫秒)

                ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my_topic");
                AlterConfigsResult result = adminClient.incrementalAlterConfigs(
                        Collections.singletonMap(resource, configs.entrySet().stream()
                                .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
                                .collect(Collectors.toList())));

                result.all().get();
            }

5.7 [2]消费限流:max.poll.records属性

01.消费限流
    a.定义
        通过控制消费者每次拉取消息的数量来实现消费限流
    b.原理
        Kafka的消费者通过poll方法从服务器拉取消息,可以通过控制poll的频率和每次拉取的消息数量来实现限流
    c.常用API
        KafkaConsumer.poll(Duration):用于拉取消息
        ConsumerConfig.MAX_POLL_RECORDS_CONFIG:配置每次poll拉取的最大消息数
    d.使用步骤
        配置消费者属性,设置max.poll.records以限制每次拉取的消息数量
        使用KafkaConsumer的poll方法拉取消息
    e.代码示例
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); // 每次拉取最多10条消息

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
            }
        }

5.8 [3]消费消息慢:5个

00.汇总
    a.Kafka消费慢的影响
        积压增加(Lag 增大),可能导致磁盘压力和数据丢失
        Rebalance 频繁,影响吞吐
        生产端可能受影响,导致写入失败
        影响 Kafka 服务器的磁盘、内存
    b.优化方向
        增加消费并发(扩展消费者实例)
        优化消费逻辑(减少单条消息处理时间)
        调整 Kafka 配置(增加 max.poll.interval.ms、优化 fetch.min.bytes 等)

01.分区滞后(Lag 增大)
    a.影响
        消费者消费速率低于生产速率时,consumer lag(积压消息数)会持续增加
        若消费积压过大,可能导致磁盘占用增加,甚至影响生产者写入性能
    b.查看 Lag
        关注 LAG 值,若持续增加,说明消费跟不上
        kafka-consumer-groups.sh --bootstrap-server <broker> --group <consumer-group> --describe

02.触发 Rebalance,影响吞吐
    a.影响
        组内消费者处理慢,可能会触发 Rebalance,导致消费暂停、吞吐下降
        心跳超时 (session.timeout.ms 默认 45s),如果超过,Kafka 认为消费者失效,会触发 Rebalance,导致组内所有消费者重新分配分区,进一步影响消费速率
    b.优化
        增加 max.poll.interval.ms(默认 5 分钟),例如:max.poll.interval.ms=900000  # 15 分钟
        让 Kafka 允许更长时间的消费,避免超时被踢出

03.可能触发数据过期或丢失
    a.影响
        如果消费速度长期跟不上,Kafka 可能因 log.retention.ms(默认 168 小时,即 7 天)删除未消费的数据,导致消息丢失
    b.查看 Topic 配置
        关注 retention.ms 和 log.segment.bytes 配置
        kafka-topics.sh --bootstrap-server <broker> --describe --topic <your-topic>
    c.优化
        增加 log.retention.ms:log.retention.ms=604800000  # 7 天
        若消息积压严重,可以适当调大,确保数据不会被提前删除

04.影响 Kafka 端的磁盘和内存
    a.影响
        磁盘占用:消费者消费慢,Kafka Broker 需要存储更多未消费的消息,导致磁盘压力大,甚至可能触发磁盘溢满
        页面缓存(Page Cache)失效:Kafka 依赖 OS Page Cache 提升读取性能,若消息长期积压,数据可能被 Page Cache 淘汰,读取效率降低
    b.优化
        扩展 Kafka 存储(增加磁盘或调整存储策略)
        优化消费逻辑,避免长时间阻塞

05.生产端可能受到影响
    a.影响
        如果生产速率过高,而消费跟不上,Kafka 可能因磁盘写满而导致生产者写入失败
        生产端会收到 kafka.common.errors.ProducerBlockedException 或 kafka.common.errors.TimeoutException 错误
    b.优化
        调整生产速率,确保消费端跟得上(可通过 acks=1 或 batch.size 控制)
        增加消费端并发度,优化消费逻辑

06.优化建议
    a.增加消费者实例数
        如果是单消费者多分区,增加 consumer-group 内的实例数,让多个消费者并行处理不同分区的数据
        示例:kafka-consumer-groups.sh --bootstrap-server <broker> --group <consumer-group> --describe
        若 CURRENT-OFFSET 和 LAG 持续增加,说明需要扩容
    b.优化消费逻辑
        减少单条消息处理时间:避免复杂计算、数据库阻塞(如使用批量插入)。使用异步处理(如 CompletableFuture)
        调大 fetch.min.bytes(减少网络请求):fetch.min.bytes=1048576  # 1MB
        调大 fetch.max.wait.ms(减少轮询压力):fetch.max.wait.ms=500  # 500ms
    c.调整 Kafka 配置
        增大 log.retention.ms,避免消息过期丢失:kafka-configs.sh --alter --entity-type topics --entity-name <topic> --add-config retention.ms=604800000
        提高 max.poll.records (一次拉取更多消息):max.poll.records=500

6 RabbitMQ

6.1 [1]生产可靠:消息确认+持久化

00.总结
    开启事务机制
    发送方确认机制
    失败重试
    消息的持久化

00.生产可靠性
    a.定义
        生产可靠性:确保生产者发送的消息能够被RabbitMQ成功接收并持久化
    b.原理
        消息确认:生产者可以启用消息确认机制,确保消息被成功接收
        持久化:通过设置消息和队列的持久化属性,确保消息在RabbitMQ重启后不会丢失
    c.常用API
        Channel:用于发送消息
        ConfirmListener:用于监听消息确认
    d.使用步骤
        创建Channel并启用确认模式
        发送消息并等待确认
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.confirmSelect(); // 启用确认模式
            String message = "Hello World!";
            channel.basicPublish("", "queue_name", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            channel.waitForConfirmsOrDie(); // 等待确认
        } catch (Exception e) {
            e.printStackTrace();
        }

01.开启事务机制
    a.提供一个事务管理器
        @Bean
        RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new RabbitTransactionManager(connectionFactory);
        }
    b.生产者:添加事务注解、设置通信信道为事务模式
        @Service
        public class MsgService {
            @Autowired
            RabbitTemplate rabbitTemplate;

            @Transactional
            public void send() {
                rabbitTemplate.setChannelTransacted(true);
                rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
                int i = 1 / 0;
            }
        }
        -----------------------------------------------------------------------------------------------------
        发送消息的方法上添加 @Transactional 注解标记事务。
        调用 setChannelTransacted 方法设置为 true 开启事务模式。
        -----------------------------------------------------------------------------------------------------
        当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
        1.客户端发出请求,将信道设置为事务模式。
        2.服务端给出回复,同意将信道设置为事务模式。
        3.客户端发送消息。
        4.客户端提交事务。
        5.服务端给出响应,确认事务提交。
        -----------------------------------------------------------------------------------------------------
        上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。
        所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。
        我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
        -----------------------------------------------------------------------------------------------------
        所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,
        这种方式,性能要远远高于事务模式,一起来看下。

02.发送方确认机制
    a.application.properties 配置开启消息发送方确认机制
        spring.rabbitmq.publisher-confirm-type=correlated
        spring.rabbitmq.publisher-returns=true
        -----------------------------------------------------------------------------------------------------
        第一行是配置消息到达交换器的确认回调,第二行则是配置消息到达队列的回调。
        第一行属性的配置有三个取值:
        none:表示禁用发布确认模式,默认即此。
        correlated:表示成功发布消息到交换器后会触发的回调方法。
        simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
    b.生产者
        @Service
        public class MsgService {
            @Autowired
            RabbitTemplate rabbitTemplate;

            public void send() {
                rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
                int i = 1 / 0;
            }
        }
    c.开启两个监听
        @Configuration
        public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
            public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
            public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
            private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Bean
            Queue queue() {
                return new Queue(JAVABOY_QUEUE_NAME);
            }
            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(JAVABOY_EXCHANGE_NAME);
            }
            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue())
                        .to(directExchange())
                        .with(JAVABOY_QUEUE_NAME);
            }

            @PostConstruct
            public void initRabbitTemplate() {
                rabbitTemplate.setConfirmCallback(this);
                rabbitTemplate.setReturnsCallback(this);
            }

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    logger.info("{}:消息成功到达交换器",correlationData.getId());
                }else{
                    logger.error("{}:消息发送失败", correlationData.getId());
                }
            }

            @Override
            public void returnedMessage(ReturnedMessage returned) {
                logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
            }
        }
        -----------------------------------------------------------------------------------------------------
        关于这个配置类,我说如下几点:
        定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。
        定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。
    d.单条消息处理,接下来我们对消息发送进行测试
        首先我们尝试将消息发送到一个不存在的交换机中,像下面这样:
        rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
        注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报错
        -----------------------------------------------------------------------------------------------------
        接下来我们给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
        注意此时第二个参数是一个字符串,不是变量。
        -----------------------------------------------------------------------------------------------------
        可以看到,消息虽然成功达到交换器了,但是没有成功路由到队列(因为队列不存在)。
        这是一条消息的发送,我们再来看看消息的批量发送。
    e.消息批量处理
        如果是消息批量处理,那么发送成功的回调监听是一样的,这里不再赘述。
        这就是 publisher-confirm 模式。
        相比于事务,这种模式下的消息吞吐量会得到极大的提升。

03.失败重试
    a.概念
        失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。
    b.自带重试机制
        前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。
        如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,
        这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
        spring.rabbitmq.template.retry.enabled=true
        spring.rabbitmq.template.retry.initial-interval=1000ms
        spring.rabbitmq.template.retry.max-attempts=10
        spring.rabbitmq.template.retry.max-interval=10000ms
        spring.rabbitmq.template.retry.multiplier=2
        -----------------------------------------------------------------------------------------------------
        从上往下配置含义依次是:
        开启重试机制。
        重试起始间隔时间。
        最大重试次数。
        最大重试间隔时间。
        间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
        -----------------------------------------------------------------------------------------------------
        配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。
    c.业务重试
        业务重试主要是针对消息没有到达交换器的情况。如果消息没有成功到达交换器,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!
        a.第一步
            首先创建一张表,用来记录发送到中间件上的消息,像下面这样:
            每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:
            status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
            tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
            count:表示消息重试次数
        b.第二步
            在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
        c.第三步
            在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1
            (在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
        d.第四步
            另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,
            把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,
            表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。
        e.总结
            当然这种思路有两个弊端:
            去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。
            按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。

04.消息的持久化
    a.生产者
        a.做消息的持久化
            消息持久化不能保证完全不丢失消息,可以存在存储磁盘的时候还没有存储完成
            但是服务宕机了也会导致消息丢失,通过发布确定保证消息不丢失
        b.消息确定机制
            import com.rabbitmq.client.*;
            public class PersistentProducer {
                private final static String QUEUE_NAME = "persistent_queue";

                public static void main(String[] argv) throws Exception {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("localhost");
                    try (Connection connection = factory.newConnection();
                         Channel channel = connection.createChannel()) {

                        // 声明一个持久化队列
                        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                        // 启用生产者确认
                        channel.confirmSelect();

                        String message = "Persistent message with producer confirm!";
                        channel.basicPublish("", QUEUE_NAME,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes());

                        // 检查消息是否成功发送
                        if (channel.waitForConfirms()) {
                            System.out.println("Message sent successfully!");
                        } else {
                            System.out.println("Message failed to send!");
                        }
                    }
                }
            }
            -------------------------------------------------------------------------------------------------
            channel.queueDeclare(QUEUE_NAME, true, false, false, null):队列是持久化的,确保 RabbitMQ 重启后队列不会丢失
            MessageProperties.PERSISTENT_TEXT_PLAIN:确保消息持久化存储
            channel.confirmSelect():启用生产者确认,确保消息成功送达 RabbitMQ
            channel.waitForConfirms():等待确认,如果生产者发送消息时发生失败,会捕获错误
    b.交换机
        a.选择合适的交换机类型
            direct、fanout、topic 和 headers,选择正确的类型来确保消息路由正确
        b.使用死信队列(Dead Letter Exchange, DLX)
            如果消息因某些原因无法被消费,可以将消息转发到死信队列进行进一步处理
        c.代码
            import com.rabbitmq.client.*;

            public class DirectExchangeProducer {
                private final static String EXCHANGE_NAME = "direct_logs";
                private final static String QUEUE_NAME = "persistent_queue";

                public static void main(String[] argv) throws Exception {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("localhost");
                    try (Connection connection = factory.newConnection();
                         Channel channel = connection.createChannel()) {

                        // 声明交换机和队列
                        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");

                        String message = "Hello, Direct Exchange!";
                        channel.basicPublish(EXCHANGE_NAME, "info",
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes());

                        System.out.println("Sent: " + message);
                    }
                }
            }

            public class DeadLetterConsumer {
                private final static String DLX_QUEUE = "dlx_queue";

                public static void main(String[] argv) throws Exception {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("localhost");
                    try (Connection connection = factory.newConnection();
                         Channel channel = connection.createChannel()) {

                        // 声明死信队列
                        channel.queueDeclare(DLX_QUEUE, true, false, false, null);

                        // 创建消费者回调
                        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                            String message = new String(delivery.getBody(), "UTF-8");
                            System.out.println("Dead Letter Queue Received: " + message);
                        };

                        // 设置死信队列消费者
                        channel.basicConsume(DLX_QUEUE, true, deliverCallback, consumerTag -> {});
                    }
                }
            }
    c.消费者
        a.说明
            进行手动应答
            消息重试
        b.代码
            import com.rabbitmq.client.*;

            public class AckConsumer {
                private final static String QUEUE_NAME = "persistent_queue";

                public static void main(String[] argv) throws Exception {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("localhost");
                    try (Connection connection = factory.newConnection();
                         Channel channel = connection.createChannel()) {

                        // 声明一个持久化队列
                        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                        // 创建一个消费者回调
                        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                            String message = new String(delivery.getBody(), "UTF-8");
                            System.out.println("Received: " + message);

                            try {
                                // 模拟消息处理
                                if (message.contains("error")) {
                                    throw new Exception("Error while processing message");
                                }

                                // 手动确认消息
                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                                System.out.println("Message processed and acknowledged");
                            } catch (Exception e) {
                                // 如果消息处理失败,可以将消息重新放回队列
                                System.out.println("Error processing message, requeueing: " + e.getMessage());
                                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                            }
                        };

                        // 设置手动确认
                        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
                    }
                }
            }
            -------------------------------------------------------------------------------------------------
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false):消息处理成功后确认消息
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true):如果消费失败,重新将消息投递到队列中,供其他消费者处理

6.2 [1]消费可靠:手动确认+消费限流

00.总结
    两种消费思路
    确保消费成功两种思路
    消息拒绝
    消息确认
    幂等性

00.消费可靠性
    a.定义
        消费可靠性:确保消费者能够成功接收并处理RabbitMQ中的消息
    b.原理
        手动确认:消费者可以手动确认消息,确保消息不会丢失
        消费限流:通过设置预取计数,限制消费者在确认消息之前可以接收的最大消息数
    c.常用API
        Channel:用于消费消息
        DeliverCallback:用于处理消息
    d.使用步骤
        创建Channel并设置手动确认和预取计数
        使用basicConsume方法消费消息,并手动确认
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.basicQos(10); // 每次最多处理10条消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 处理消息
                System.out.println("Received: " + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动确认
            };
            channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> {});
        } catch (Exception e) {
            e.printStackTrace();
        }

01.两种消费思路
    a.两种消费思路
        RabbitMQ 的消息消费,整体上来说有两种不同的思路:
        推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。
        拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。
    b.推(push)
        通过 @RabbitListener 注解去标记消费者,如下:
        @Component
        public class ConsumerDemo {
            @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
            public void handle(String msg) {
                System.out.println("msg = " + msg);
            }
        }
        当监听的队列中有消息时,就会触发该方法。
    c.拉(pull)
        @Test
        public void test01() throws UnsupportedEncodingException {
            Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
            System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
        }
        -----------------------------------------------------------------------------------------------------
        调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,
        如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,
        可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,
        则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,
        3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
    d.总结
        如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。
        切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。

02.确保消费成功两种思路
    a.概念
        为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。
        当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
        -----------------------------------------------------------------------------------------------------
        当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
        当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
    b.在 RabbitMQ 的 web 管理页面
        Ready 表示待消费的消息数量。
        Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
        这是我们可以从 UI 层面观察消息的消费情况确认情况。
    c.当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
        待消费的消息
        已经投递给消费者,但是还没有被消费者确认的消息
        -----------------------------------------------------------------------------------------------------
        换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,
        当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。
        如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,
        那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
    d.总结
        综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。
        当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。

03.消息拒绝
    a.当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:
        @Component
        public class ConsumerDemo {
            @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
            public void handle(Channel channel, Message message) {
                //获取消息编号
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                try {
                    //拒绝消息
                    channel.basicReject(deliveryTag, true);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    b.消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
        1.获取消息编号 deliveryTag。
        2.调用 basicReject 方法拒绝消息。
    c.总结
        调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。
        如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;
        如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
        需要注意的是,basicReject 方法一次只能拒绝一条消息。

04.消息确认
    a.自动确认
        在 Spring Boot 中,默认情况下,消息消费就是自动确认的
        -----------------------------------------------------------------------------------------------------
        @Component
        public class ConsumerDemo {
            @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
            public void handle2(String msg) {
                System.out.println("msg = " + msg);
                int i = 1 / 0;
            }
        }
        -----------------------------------------------------------------------------------------------------
        通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,
        默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待
        下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。
    b.手动确认
        a.推模式手动确认
            a.application.properties
                要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:
                spring.rabbitmq.listener.simple.acknowledge-mode=manual
                这个配置表示将消息的确认模式改为手动确认。
            b.消费者
                @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
                public void handle3(Message message,Channel channel) {
                    long deliveryTag = message.getMessageProperties().getDeliveryTag();
                    try {
                        //消息消费的代码写到这里
                        String s = new String(message.getBody());
                        System.out.println("s = " + s);
                        //消费完成后,手动 ack
                        channel.basicAck(deliveryTag, false);
                    } catch (Exception e) {
                        //手动 nack
                        try {
                            channel.basicNack(deliveryTag, false, true);
                        } catch (IOException ex) {
                            ex.printStackTrace();
                        }
                    }
                }
                ---------------------------------------------------------------------------------------------
                将消费者要做的事情放到一个 try..catch 代码块中。
                如果消息正常消费成功,则执行 basicAck 完成确认。
                如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。
                ---------------------------------------------------------------------------------------------
                这里涉及到两个方法:
                basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
                basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。
        b.拉模式手动确认
            a.说明
                拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法
            b.原生办法
                public void receive2() {
                    Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
                    long deliveryTag = 0L;
                    try {
                        GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
                        deliveryTag = getResponse.getEnvelope().getDeliveryTag();
                        System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
                        channel.basicAck(deliveryTag, false);
                    } catch (IOException e) {
                        try {
                            channel.basicNack(deliveryTag, false, true);
                        } catch (IOException ex) {
                            ex.printStackTrace();
                        }
                    }
                }

05.幂等性
    a.场景
        消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ
        并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,
        这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次
        (参见四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?)。
        种种原因导致我们在消费消息时,一定要处理好幂等性问题。
    b.思路
        采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
        id-0(正在执行业务)
        id-1(执行业务成功)
        -----------------------------------------------------------------------------------------------------
        如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,
        如果 key 已经存在(说明之前有人消费过该消息),获取他的值,
        如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
        -----------------------------------------------------------------------------------------------------
        极端情况:第一个消费者在执行业务时,出现了死锁,
        在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。

6.3 [1]生产重复:手动确认机制

01.生产时消息重复
    a.定义
        生产者在发送消息时可能会因为网络问题或重试机制导致消息被多次发送
    b.原理
        RabbitMQ通过消息确认机制来解决生产时消息重复问题
        消息确认:生产者在发送消息后等待RabbitMQ的确认,确保消息被成功接收
    c.常用API
        Channel:用于发送消息
        ConfirmListener:用于监听消息确认
    d.使用步骤
        创建Channel并启用确认模式
        发送消息并等待确认
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.confirmSelect(); // 启用确认模式
            String message = "Hello World!";
            channel.basicPublish("", "queue_name", null, message.getBytes());
            channel.waitForConfirmsOrDie(); // 等待确认
        }

6.4 [1]消费重复:手动确认机制

01.消费时消息重复
    a.定义
        消费者在消费消息时可能会因为网络问题或重试机制导致消息被多次消费
    b.原理
        RabbitMQ通过手动确认机制来解决消费时消息重复问题
        消费者在处理完消息后手动确认,确保消息不会被重复消费
    c.常用API
        Channel:用于消费消息
        DeliverCallback:用于处理消息
    d.使用步骤
        创建Channel并设置手动确认
        消费消息并手动确认
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.basicQos(1); // 每次只处理一个消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 处理消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动确认
            };
            channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> {});
        }

6.5 [1]数据一致性:汇总

00.汇总
    通过【消息确认机制】、【持久化】、【手动确认】来确保数据一致性
    生产者:【确认模式】、【设置持久化属性】
    消费者:【手动确认消息】

01.RabbitMQ
    a.定义
        数据一致性:确保消息在生产者发送到RabbitMQ、存储在RabbitMQ以及消费者消费时保持一致性
    b.原理
        消息确认机制:生产者可以启用消息确认机制,确保消息被成功接收
        持久化:通过设置消息和队列的持久化属性,确保消息在RabbitMQ重启后不会丢失
        手动确认:消费者可以手动确认消息,确保消息不会丢失
    c.常用API
        Channel:用于发送和消费消息
        ConfirmListener:用于监听消息确认
    d.使用步骤
        创建Channel并启用确认模式和持久化属性
        发送消息并等待确认
        使用basicConsume方法消费消息,并手动确认
    e.代码示例
        a.生产者代码
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.confirmSelect(); // 启用确认模式
                String message = "Hello World!";
                channel.basicPublish("", "queue_name", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                channel.waitForConfirmsOrDie(); // 等待确认
            } catch (Exception e) {
                e.printStackTrace();
            }
        b.消费者代码
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    // 处理消息
                    System.out.println("Received: " + message);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动确认
                };
                channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> {});
            } catch (Exception e) {
                e.printStackTrace();
            }

6.6 [2]消息过期:队列TTL、消息TTL

00.汇总
    通过设置【队列TTL、消息TTL】来处理消息过期问题,超过TTL的消息会被自动删除,并可以转发到死信队列进行后续处理

01.RabbitMQ
    a.定义
        消息过期:RabbitMQ中的消息可以通过设置TTL(Time-To-Live)来控制消息的过期时间,超过TTL的消息会被自动删除
    b.原理
        TTL设置:可以为队列或单个消息设置TTL,超过TTL的消息会被自动删除
        死信队列:过期的消息可以被转发到死信队列(DLX)进行后续处理
    c.常用API
        Channel:用于设置队列和消息的TTL
        QueueDeclare:用于声明队列并设置TTL
    d.使用步骤
        使用Channel设置队列或消息的TTL
        RabbitMQ会自动删除超过TTL的消息
    e.代码示例
        a.设置队列TTL
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                Map<String, Object> args = new HashMap<>();
                args.put("x-message-ttl", 60000); // 设置消息TTL为60秒

                channel.queueDeclare("my_queue", true, false, false, args);
                String message = "Hello World!";
                channel.basicPublish("", "my_queue", null, message.getBytes());
            }
        b.设置消息TTL
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .expiration("60000") // 设置消息TTL为60秒
                        .build();

                String message = "Hello World!";
                channel.basicPublish("", "my_queue", properties, message.getBytes());
            }

6.7 [2]消费限流:创建Channel+basicQos设置预取计数

01.消费限流
    a.定义
        通过限制消费者每次从队列中获取的消息数量来实现消费限流
    b.原理
        RabbitMQ通过basicQos方法设置预取计数(prefetch count),限制消费者在确认消息之前可以接收的最大消息数
    c.常用API
        Channel.basicQos(int prefetchCount):用于设置预取计数
        Channel.basicConsume:用于消费消息
    d.使用步骤
        创建Channel并使用basicQos设置预取计数
        使用basicConsume方法消费消息
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.basicQos(10); // 每次最多处理10条消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 处理消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动确认
            };
            channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> {});
        }

6.8 [3]消息丢失:3种

00.汇总
    问题主要            原因                          解决方案
    生产者消息丢失       网络故障、RabbitMQ 崩溃       开启 Confirm 模式、Mandatory 参数
    RabbitMQ内部丢失    未持久化队列或消息             开启持久化+Confirm模式
    消费者消息丢失       ACK机制错误                   手动ACK+死信队列
    消息重复消费         ACK 丢失、业务未幂等          手动 ACK+ 幂等处理

01.生产者消息丢失
    a.原因分析
        生产者在发送消息到 RabbitMQ 时,可能会因以下原因导致消息丢失
        1.网络故障:消息未能成功到达 RabbitMQ
        2.RabbitMQ 崩溃:生产者未确认消息是否成功送达
        3.生产者代码异常:消息未正确发送
    b.解决方案
        a.使用事务模式(不推荐)
            通过 channel.txSelect() 开启事务,channel.basicPublish() 发送消息,channel.txCommit() 提交事务
            缺点:事务模式会显著影响性能,因此不推荐在高并发场景下使用
        b.使用 Publisher Confirm 模式(推荐)
            生产者开启 confirm 模式,每次发送消息后等待 RabbitMQ 的确认
            示例代码:
            Channel channel = connection.createChannel();
            channel.confirmSelect();
            channel.basicPublish("exchange", "routingKey", null, "message".getBytes());
            if (!channel.waitForConfirms()) {
                System.out.println("消息可能丢失");
            }
            优点:确保消息成功写入 RabbitMQ,性能优于事务模式
        c.使用 Mandatory 参数或备份交换机
            设置 mandatory=true,当消息无法被路由时,RabbitMQ 会将消息返回给生产者
            配置备份交换机,当消息无法投递时,存入备份队列,避免消息丢失

02.RabbitMQ内部消息丢失
    a.原因分析
        RabbitMQ 内部消息存储在内存或磁盘中,若未进行持久化,可能会导致消息丢失:
        1.队列未持久化:RabbitMQ 重启后,队列中的消息丢失
        2.消息未持久化:RabbitMQ 崩溃时,内存中的消息丢失
    b.解决方案
        a.开启队列持久化
            在声明队列时,设置 durable=true,确保 RabbitMQ 重启后队列不会丢失
            示例代码:
            boolean durable = true;
            channel.queueDeclare("queue", durable, false, false, null);
        b.开启消息持久化
            在发送消息时,设置 deliveryMode=2,确保消息持久化到磁盘
            示例代码:
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)  // 1:非持久化, 2:持久化
                .build();
            channel.basicPublish("exchange", "routingKey", properties, "message".getBytes());
        c.最佳实践
            结合队列持久化和消息持久化,并使用 Publisher Confirm 模式,确保消息不丢失

03.消费者消息丢失
    a.原因分析
        消费者在处理消息时,可能会因以下原因导致消息丢失
        1.消息未正确 ACK:RabbitMQ 误以为消息已被消费并删除,但实际上消费者未处理完毕
        2.消费者进程崩溃:消费者在处理消息时崩溃,导致消息未完成处理
    b.解决方案
        a.手动 ACK
            避免使用 autoAck=true,改为手动确认消息处理完毕后再发送 ACK
            示例代码:
            boolean autoAck = false;
            channel.basicConsume("queue", autoAck, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    System.out.println("Received: " + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        b.死信队列(DLX)处理异常消息
            当消息被拒绝(basicNack 或 basicReject)时,可以将其转入死信队列(DLX),避免消息直接丢失
            适用场景:处理消费者无法正常处理的消息,确保消息不会丢失

6.9 [3]消费重复:3种

00.汇总
    确保消息ACK成功
    消息去重(业务幂等性)
    RabbitMQ唯一消息ID

01.重复消费的原因
    a.消费者ACK丢失
        RabbitMQ 未收到 ACK,导致消息重新投递
    b.网络问题
        消费者 ACK 后,网络中断,RabbitMQ 未收到确认,重新投递
    c.业务逻辑未实现幂等性
        即使消息被重复投递,业务层仍需保证最终一致性

02.解决方案
    a.确保消息 ACK 成功
        在代码中确保消息处理完毕后再发送 ACK
        避免使用 autoAck=true,使用 basicAck 确保 RabbitMQ 收到确认
    b.消息去重(业务幂等性)
        a.数据库去重(适用于写操作)
            设计唯一约束,如 orderId 唯一
            消费时,先检查 orderId 是否已处理
        b.Redis 去重(适用于高并发场景)
            使用 SETNX 存储 msgId,若已存在,则丢弃
            示例代码:
            String msgId = getMessageId(message);
            if (redis.setnx(msgId, "1") == 0) {
                System.out.println("重复消息,丢弃");
                return;
            }
    c.RabbitMQ 唯一消息 ID
        使用 Message Deduplication 插件:让 RabbitMQ 自动去重
        在消息属性中增加唯一 ID,如 UUID,消费者根据唯一 ID 进行去重

7 消息队列

7.1 汇总:5个

01.list
    基于list结构模拟消息队列,LPUSH发布,RPOP拉取,BRPOP阻塞拉取消息

02.pubsub
    点对点消息模型,SUBSCRIBE订阅,UNSUBSCRIBE取消订阅

03.stream
    Redis 5.0版本新增,xadd 存入消息和 xread 循环阻塞读取消息
    使用 xadd 存入消息和 xread 循环阻塞读取消息的方式
    使用 Jedis 的 xreadGroup() 方法实现了消息的阻塞读取,并且使用此方法自带 noAck 参数,实现了消息的自动确认
    一个分组内的多个 consumer 会轮询收到消息队列的消息,并且不会出现一个消息被多个 consumer 读取的情况

04.Kafka
    a.基于分区的消息队列
        结构:Kafka使用主题(Topic)和分区(Partition)来组织消息。每个主题可以有多个分区,消息在分区内是有序的
        生产者:使用Producer API将消息发送到指定的主题和分区
        消费者:使用Consumer API从分区中拉取消息。消费者可以组成消费组(Consumer Group),同一消费组内的消费者会自动负载均衡分区
    b.发布-订阅模型
        模型:Kafka的主题支持发布-订阅模型,生产者将消息发布到主题,消费者订阅主题以接收消息
        消费组:每个消费组中的消费者共享一个订阅,确保每条消息只被消费组中的一个消费者处理
    c.消息确认与偏移量管理
        偏移量:Kafka使用偏移量(Offset)来跟踪每个消费者在分区中的消费进度
        自动提交:消费者可以配置自动提交偏移量,或者手动提交偏移量以实现精确控制
        幂等性与事务:Kafka支持幂等性生产者和事务性消息,以确保消息的精确一次交付

05.RabbitMQ
    a.基于队列的消息模型
        结构:RabbitMQ使用队列(Queue)来存储消息,生产者将消息发送到交换机(Exchange),交换机根据绑定规则将消息路由到一个或多个队列
        生产者:使用basicPublish方法将消息发送到交换机
        消费者:使用basicConsume方法从队列中消费消息
    b.发布-订阅模型
        模型:RabbitMQ支持多种交换机类型(如fanout、direct、topic、headers),实现不同的发布-订阅模型
        订阅:消费者通过绑定队列到交换机来订阅消息
    c.消息确认与消费限流
        消息确认:消费者可以使用手动确认(basicAck)来确认消息处理完成,确保消息不会丢失
        消费限流:通过basicQos设置预取计数,限制消费者在确认消息之前可以接收的最大消息数

7.2 [1]list:lpush/lpop

01.list,模拟消息队列
    a.概念
        使用 Redis 的 List 结构作为消息队列是一个简单且有效的选择
        因为 List 的底层实现是链表,操作元素时在头部和尾部的时间复杂度都是 O(1),非常适合用于消息队列的场景
    b.命令
        LPUSH:用于发布消息(将消息放入队列的头部)
        RPOP:用于拉取消息(从队列的尾部取出消息)。当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL
        BRPOP:用于阻塞拉取消息(当队列为空时,阻塞等待直到有新消息到来)
    c.优点
        利用 Redis 存储,不受 JVM 内存限制
        基于 Redis 的持久化机制,确保数据安全
        能够满足消息的有序性
    d.缺点
        不支持重复消费:一旦消费者拉取消息,该消息就会从 List 中删除,无法被其他消费者再次消费
        消息丢失:如果消费者拉取到消息后发生异常或宕机,则该消息会丢失,因为它已从 List 中删除

02.示例:LPUSH发布,RPOP拉取,BRPOP阻塞拉取消息
    a.代码
        package com.ruoyi.redis4.demo05;

        import redis.clients.jedis.Jedis;

        // list,基于list结构模拟消息队列
        public class Demo01 {
            private static final String QUEUE_NAME = "my_queue";
            private static final String PROCESSING_QUEUE_NAME = "processing_queue";

            public static void main(String[] args) {
                // 启动生产者线程
                Thread producerThread = new Thread(() -> {
                    try (Jedis jedis = new Jedis("localhost", 6379)) { // 连接到Redis
                        for (int i = 0; i < 10; i++) {
                            String message = "Message " + i;
                            jedis.rpush(QUEUE_NAME, message); // 入队
                            System.out.println("Produced: " + message);
                            // 模拟生产间隔
                            Thread.sleep(500);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt(); // 恢复中断状态
                    }
                });

                // 启动消费者线程
                Thread consumerThread = new Thread(() -> {
                    try (Jedis jedis = new Jedis("localhost", 6379)) { // 连接到Redis
                        while (true) {
                            String message = jedis.lpop(QUEUE_NAME); // 从主队列出队
                            if (message != null) {
                                // 将消息放入“处理中”队列
                                jedis.rpush(PROCESSING_QUEUE_NAME, message);
                                System.out.println("Processing: " + message);

                                // 模拟消息处理
                                Thread.sleep(1000); // 模拟处理时间

                                // 处理完成,确认并从“处理中”队列中移除
                                jedis.lrem(PROCESSING_QUEUE_NAME, 1, message); // 从处理中队列移除
                                System.out.println("Consumed: " + message); // 处理完成
                            } else {
                                try {
                                    Thread.sleep(1000); // 如果没有消息,等待一段时间再尝试
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt(); // 恢复中断状态
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });

                // 启动线程
                producerThread.start();
                consumerThread.start();

                // 等待生产者线程结束
                try {
                    producerThread.join();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 恢复中断状态
                }
            }
        }
    b.说明
        生产者:将消息推送到Redis队列
        消费者:从队列中取出消息,并将其标记为“待处理”
                处理消息后,确认消息的处理状态
                处理完成后从“待处理”状态中移除该消息

7.3 [2]pub/sub:点对点消息模型

01.pub/sub,点对点消息模型
    a.概念
        PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel
        生产者向对应channel发送消息后,所有订阅者都能收到相关消息。对应channel发送消息后,所有订阅者都能收到相关消息
    b.命令
        SUBSCRIBE:订阅一个或多个频道
        UNSUBSCRIBE:取消订阅一个或多个频道
        PSUBSCRIBE:订阅一个或多个模式
        PUNSUBSCRIBE:取消订阅一个或多个模式
    c.缺点
        Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃
        而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了

02.示例:SUBSCRIBE订阅,UNSUBSCRIBE取消订阅
    a.代码
        package com.ruoyi.redis4.demo05;

        import redis.clients.jedis.Jedis;
        import redis.clients.jedis.JedisPubSub;

        // pubsub,点对点消息模型
        public class Demo02 {

            private static final String CHANNEL_NAME = "my_channel";

            public static void main(String[] args) {
                // 启动订阅者线程
                Thread subscriberThread = new Thread(() -> {
                    try (Jedis jedis = new Jedis("localhost", 6379)) { // 连接到Redis
                        JedisPubSub jedisPubSub = new JedisPubSub() {
                            @Override
                            public void onMessage(String channel, String message) {
                                System.out.println("Received message from " + channel + ": " + message); // 处理消息
                            }
                        };

                        System.out.println("Subscribed to channel: " + CHANNEL_NAME);
                        jedis.subscribe(jedisPubSub, CHANNEL_NAME); // 订阅频道
                    }
                });

                // 启动发布者线程
                Thread publisherThread = new Thread(() -> {
                    try (Jedis jedis = new Jedis("localhost", 6379)) { // 连接到Redis
                        for (int i = 0; i < 10; i++) {
                            String message = "Message " + i;
                            jedis.publish(CHANNEL_NAME, message); // 发布消息
                            System.out.println("Published: " + message);
                            // 模拟发布间隔
                            Thread.sleep(1000);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt(); // 恢复中断状态
                    }
                });

                // 启动线程
                subscriberThread.start();
                publisherThread.start();

                // 等待发布者线程结束
                try {
                    publisherThread.join(); // 等待发布者完成
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 恢复中断状态
                }

                // 在发布者完成后,可以选择中断订阅者线程
                subscriberThread.interrupt(); // 中断订阅者,示例目的
            }
        }
    b.说明
        发布者:MessagePublisher类负责连接到Redis并通过PUBLISH命令将消息发布到指定频道中
               在这个示例中,它会发送10条消息,每条消息之间有1秒的间隔
        订阅者:MessageSubscriber类使用JedisPubSub类来实现对频道的订阅,并重写onMessage方法以处理接收到的消息
               在这个示例中,订阅者会持续运行,等待并处理发布到my_channel的消息

7.4 [3]stream:redis5.0新增

01.stream,比较完善的消息队列模型
    a.概念
        Redis Stream 是 Redis 5.0 版本新增加的数据结构
        Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能
        但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃
    b.组成
        Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)
        lastdeliveredid :游标,每个消费组会有个游标 lastdeliveredid,任意一个消费者读取了消息都会使游标 lastdeliveredid 往前移动
        pendingids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pendingids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)
    c.优点
        1.持久化存储:Stream中的消息可以被持久化存储,确保数据不会丢失,即使在Redis服务器重启后也能恢复消息
        2.有序性:消息按照产生顺序生成消息ID, 被添加到Stream中,并且可以按照指定的条件检索消息,保证了消息的有序性
        3.多播与分组消费:支持多个消费者同时消费同一流中的消息,并且可以将消费者组织成消费组,实现消息的分组消费
        4.消息确认机制:消费者可以通过XACK命令确认是否成功消费消息,保证消息至少背消费一次,确保消息不会被重复处理
        5.阻塞读取:消费者可以选择阻塞读取模式,当没有新消息时,消费者会等待直至新消息到达
        6.消息可回溯: 方便补数、特殊数据处理, 以及问题回溯查询
    d.消息队列
        XADD               添加消息到末尾
        XTRIM              对流进行修剪,限制长度
        XDEL               删除消息
        XLEN               获取流包含的元素数量,即消息长度
        XRANGE             获取消息列表,会自动过滤已经删除的消息
        XREVRANGE          反向获取消息列表,ID 从大到小
        XREAD              以阻塞或非阻塞方式获取消息列表
    e.消费者组
        XGROUP CREATE      创建消费者组
        XREADGROUP GROUP   读取消费者组中的消息
        XACK               将消息标记为"已处理"
        XGROUP SETID       为消费者组设置新的最后递送消息ID
        XGROUP DELCONSUMER 删除消费者
        XGROUP DESTROY     删除消费者组
        XPENDING           显示待处理消息的相关信息
        XCLAIM             转移消息的归属权
        XINFO              查看流和消费者组的相关信息
        XINFO GROUPS       打印消费者组的信息
        XINFO STREAM       打印流信息

02.示例:xadd 存入消息和 xread 循环阻塞读取消息
    a.代码
        package com.ruoyi.redis4.demo05;

        import com.google.gson.Gson;
        import java.util.AbstractMap;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
        import redis.clients.jedis.Jedis;
        import redis.clients.jedis.StreamEntry;
        import redis.clients.jedis.StreamEntryID;

        // stream,Redis 5.0版本新增,xadd 存入消息和 xread 循环阻塞读取消息
        public class Demo03 {
            private static final String _STREAM_KEY = "mq"; // 流 key
            private static final String _GROUP_NAME = "g1"; // 分组名称
            private static final String _CONSUMER_NAME = "c1"; // 消费者 1 的名称
            private static final String _CONSUMER2_NAME = "c2"; // 消费者 2 的名称
            public static void main(String[] args) {
                // 生产者
                producer();
                // 创建消费组
                createGroup(_STREAM_KEY, _GROUP_NAME);
                // 消费者 1
                new Thread(() -> consumer()).start();
                // 消费者 2
                new Thread(() -> consumer2()).start();
            }
            /**
             * 创建消费分组
             * @param stream    流 key
             * @param groupName 分组名称
             */
            public static void createGroup(String stream, String groupName) {
                Jedis jedis = JedisUtils.getJedis();
                jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
            }
            /**
             * 生产者
             */
            public static void producer() {
                Jedis jedis = JedisUtils.getJedis();
                // 添加消息 1
                Map<String, String> map = new HashMap<>();
                map.put("data", "redis");
                StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
                System.out.println("消息添加成功 ID:" + id);
                // 添加消息 2
                Map<String, String> map2 = new HashMap<>();
                map2.put("data", "java");
                StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
                System.out.println("消息添加成功 ID:" + id2);
            }
            /**
             * 消费者 1
             */
            public static void consumer() {
                Jedis jedis = JedisUtils.getJedis();
                // 消费消息
                while (true) {
                    // 读取消息
                    Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
                            new StreamEntryID().UNRECEIVED_ENTRY);
                    // 阻塞读取一条消息(最大阻塞时间120s)
                    List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
                            120 * 1000, true, entry);
                    if (list != null && list.size() == 1) {
                        // 读取到消息
                        Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容
                        System.out.println("Consumer 1 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +
                                " 内容:" + new Gson().toJson(content));
                    }
                }
            }
            /**
             * 消费者 2
             */
            public static void consumer2() {
                Jedis jedis = JedisUtils.getJedis();
                // 消费消息
                while (true) {
                    // 读取消息
                    Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
                            new StreamEntryID().UNRECEIVED_ENTRY);
                    // 阻塞读取一条消息(最大阻塞时间120s)
                    List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
                            120 * 1000, true, entry);
                    if (list != null && list.size() == 1) {
                        // 读取到消息
                        Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容
                        System.out.println("Consumer 2 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +
                                " 内容:" + new Gson().toJson(content));
                    }
                }
            }
        }
    b.说明
        使用 xadd 存入消息和 xread 循环阻塞读取消息的方式
        使用 Jedis 的 xreadGroup() 方法实现了消息的阻塞读取,并且使用此方法自带 noAck 参数,实现了消息的自动确认
        一个分组内的多个 consumer 会轮询收到消息队列的消息,并且不会出现一个消息被多个 consumer 读取的情况

03.示例:SpringBoot实现Redis Stream
    a.依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    b.RedisTemplate
        package com.mjg.config;

        import com.fasterxml.jackson.annotation.JsonAutoDetect;
        import com.fasterxml.jackson.annotation.PropertyAccessor;
        import com.fasterxml.jackson.databind.ObjectMapper;
        import com.fasterxml.jackson.databind.SerializationFeature;
        import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.data.redis.connection.RedisConnectionFactory;
        import org.springframework.data.redis.core.RedisTemplate;
        import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
        import org.springframework.data.redis.serializer.StringRedisSerializer;

        @Configuration
        public class RedisConfig {

            @Bean
            public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
                RedisTemplate<String, Object> template = new RedisTemplate<>();
                template.setConnectionFactory(connectionFactory);
                Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
                ObjectMapper om = new ObjectMapper();
                om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        //        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
                // 注册 Java 8 日期时间模块
                om.registerModule(new JavaTimeModule());
                om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
                om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
                jackson2JsonRedisSerializer.serialize(om);
                StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
                // key 采用 String 的序列化方式
                template.setKeySerializer(stringRedisSerializer);
                // hash 的 key 也采用 String 的序列化方式
                template.setHashKeySerializer(stringRedisSerializer);
                // value 序列化方式采用 jackson
                template.setValueSerializer(jackson2JsonRedisSerializer);
                // hash 的 value 序列化方式采用 jackson
                template.setHashValueSerializer(jackson2JsonRedisSerializer);
                template.afterPropertiesSet();
                return template;
            }
        }
    c.RedisStreamConfig
        package com.mjg.config;

        import cn.hutool.core.convert.Convert;
        import cn.hutool.core.util.StrUtil;
        import lombok.RequiredArgsConstructor;
        import lombok.SneakyThrows;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.beans.factory.DisposableBean;
        import org.springframework.beans.factory.InitializingBean;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.data.redis.connection.RedisConnectionFactory;
        import org.springframework.data.redis.connection.RedisServerCommands;
        import org.springframework.data.redis.connection.stream.Consumer;
        import org.springframework.data.redis.connection.stream.ObjectRecord;
        import org.springframework.data.redis.connection.stream.ReadOffset;
        import org.springframework.data.redis.connection.stream.StreamOffset;
        import org.springframework.data.redis.core.RedisCallback;
        import org.springframework.data.redis.core.RedisTemplate;
        import org.springframework.data.redis.core.StreamOperations;
        import org.springframework.data.redis.stream.StreamListener;
        import org.springframework.data.redis.stream.StreamMessageListenerContainer;
        import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
        import org.springframework.util.Assert;

        import java.net.InetAddress;
        import java.time.Duration;
        import java.util.Properties;

        @Slf4j
        @RequiredArgsConstructor
        @Configuration
        public class RedisStreamConfig implements InitializingBean, DisposableBean {
            private final RedisTemplate<String, Object> redisTemplate;

            public static String streamName = "user-event-stream";
            public static String userEventGroup = "user-event-group";
            private final ThreadPoolTaskExecutor threadPoolTaskExecutor;

            /**
             * 消息侦听器容器,用于监听 Redis Stream 中的消息
             *
             * @param connectionFactory Redis 连接工厂,用于创建 Redis 连接
             * @param messageConsumer   消息消费者,用于处理接收到的消息
             * @return 返回 {@link StreamMessageListenerContainer}<{@link String}, {@link ObjectRecord}<{@link String}, {@link String}>> 类型的消息侦听器容器
             */
            @Bean
            public StreamMessageListenerContainer<String, ObjectRecord<String, String>> messageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {
                StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = streamContainer(streamName, connectionFactory, messageConsumer);
                listenerContainer.start();
                return listenerContainer;
            }

            /**
             * 创建一个流容器,用于监听 Redis Stream 中的数据
             *
             * @param streamName        Redis Stream 的名称
             * @param connectionFactory Redis 连接工厂
             * @param streamListener    绑定的监听类
             * @return 返回 StreamMessageListenerContainer 对象
             */
            @SneakyThrows
            private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String streamName, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) {
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                        StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                              .builder()
                              .pollTimeout(Duration.ofSeconds(5)) // 拉取消息超时时间
                              .batchSize(10) // 批量抓取消息
                              .targetType(String.class) // 传递的数据类型
                              .executor(threadPoolTaskExecutor)
                              .build();
                StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
                      .create(connectionFactory, options);
                // 指定消费最新的消息
                StreamOffset<String> offset = StreamOffset.create(streamName, ReadOffset.lastConsumed());
                // 创建消费者
                StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = buildStreamReadRequest(offset, streamListener);
                // 指定消费者对象
                container.register(streamReadRequest, streamListener);
                return container;
            }

            /**
             * 生成流读取请求
             *
             * @param offset         偏移量,用于指定从 Redis Stream 中的哪个位置开始读取消息
             * @param streamListener 流侦听器,用于处理接收到的消息
             * @return 返回一个 StreamReadRequest 对象,表示一个流读取请求
             * @throws Exception 当 streamListener 无法识别为 MessageConsumer 类型时,抛出异常
             */
            private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception {
                Consumer consumer;
                if (streamListener instanceof MessageConsumer) {
                    consumer = Consumer.from(userEventGroup, InetAddress.getLocalHost().getHostName());
                } else {
                    throw new Exception("无法识别的 stream key");
                }
                // 关闭自动 ack 确认
                return StreamMessageListenerContainer.StreamReadRequest.builder(offset)
                      .errorHandler((error) -> {
                            log.error(error.getMessage());
                        })
                      .cancelOnError(e -> false)
                      .consumer(consumer)
                        // 关闭自动 ack 确认
                      .autoAcknowledge(false)
                      .build();
            }

            /**
             * 检查 Redis 版本是否符合要求
             *
             * @throws IllegalStateException 如果 Redis 版本小于 5.0.0 版本,抛出该异常
             */
            private void checkRedisVersion() {
                // 获得 Redis 版本
                Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
                Assert.notNull(info, "Redis info is null");
                Object redisVersion = info.get("redis_version");
                Integer anInt = Convert.toInt(redisVersion);
                if (anInt < 5) {
                    throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", redisVersion));
                }
            }

            @Override
            public void destroy() throws Exception {
            }
            @Override
            public void afterPropertiesSet() throws Exception {
                checkRedisVersion();
                StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
                if (Boolean.FALSE.equals(redisTemplate.hasKey(streamName))) {
                    streamOperations.createGroup(streamName, ReadOffset.from("0"), userEventGroup);
                }
            }
        }
    d.生产者
        package com.mjg.config;

        import lombok.RequiredArgsConstructor;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.data.redis.connection.stream.RecordId;
        import org.springframework.data.redis.connection.stream.StreamRecords;
        import org.springframework.data.redis.core.RedisTemplate;
        import org.springframework.stereotype.Component;

        import java.util.Collections;

        @Component
        @RequiredArgsConstructor
        @Slf4j
        public class MessageProducer {

            private final RedisTemplate<String, Object> redisTemplate;

            public void sendMessage(String streamKey, Object message) {
                RecordId recordId = redisTemplate
                      .opsForStream().add(StreamRecords.newRecord()
                              .ofMap(Collections.singletonMap("data", message))
                              .withStreamKey(streamKey));
                if (recordId!= null) {
                    log.info("Message sent to Stream '{}' with RecordId: {}", streamKey, recordId);
                }
            }
        }
    e.消费者
        package com.mjg.config;

        import lombok.RequiredArgsConstructor;
        import org.springframework.data.redis.connection.stream.ObjectRecord;
        import org.springframework.data.redis.core.RedisTemplate;
        import org.springframework.data.redis.stream.StreamListener;
        import org.springframework.stereotype.Component;

        @RequiredArgsConstructor
        @Component
        public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {

            private final RedisTemplate<String, Object> redisTemplate;

            @Override
            public void onMessage(ObjectRecord<String, String> message) {
                String stream = message.getStream();
                String messageId = message.getId().toString();
                String messageBody = message.getValue();

                System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
                System.out.println("Message body: " + messageBody);

        //        消息应答
                redisTemplate.opsForStream().acknowledge(RedisStreamConfig.streamName, RedisStreamConfig.userEventGroup, message.getId());
            }
        }
    f.测试@RequiredArgsConstructor
        @Slf4j
        @RestController
        public class MessageController {
            public static String streamName = "user-event-stream";
            private final MessageProducer messageProducer;

            @GetMapping("/send")
            public void send() {
                messageProducer.sendMessage(streamName, "hello 啦啦啦啦" + LocalDateTime.now());
            }
        }

7.5 [4]kafka:生产者(发布主题)+消费者(订阅主题)+偏移量

00.汇总
    a.点对点消费
        通过消费组实现,每条消息只被消费组中的一个消费者处理
        一个组 消费 多个topic
        一个组 消费 一个topic
        多个组 消费 一个topic
        不同组 消费 多个topic
    b.发布订阅模式
        通过多个消费组实现,每个消费组独立消费同一个主题的消息

01.点对点消费
    a.定义
        在Kafka中,点对点消费模式是通过消费组(Consumer Group)实现的
        每个消费组中的消费者共享一个订阅,确保每条消息只被消费组中的一个消费者处理
    b.原理
        消费组:Kafka通过消费组来管理消息的消费,每个消费组中的消费者会自动负载均衡分区
        分区分配:Kafka会将主题的分区分配给消费组中的消费者,确保每个分区只被一个消费者处理
    c.常用API
        KafkaConsumer:用于消费消息
        ConsumerConfig:用于配置消费者属性
    d.使用步骤
        创建一个消费组
        使用KafkaConsumer订阅主题
        消费者自动负载均衡分区,确保每条消息只被消费组中的一个消费者处理
    e.代码示例
        a.一个组消费一个topic
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("topic1"));

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    }
                }
            } finally {
                consumer.close();
            }
        b.多个组消费一个topic
            // Group 1
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
            consumer1.subscribe(Collections.singletonList("topic1"));

            // Group 2
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
            KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
            consumer2.subscribe(Collections.singletonList("topic1"));
        c.不同组消费多个topic
            // Group 1 consuming topic1
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
            consumer1.subscribe(Collections.singletonList("topic1"));

            // Group 2 consuming topic2
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
            KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
            consumer2.subscribe(Collections.singletonList("topic2"));
        d.一个组消费多个topic
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("topic1", "topic2"));

02.发布订阅模式
    a.定义
        在Kafka中,发布订阅模式是通过多个消费组实现的。每个消费组可以独立消费同一个主题的消息
    b.原理
        独立消费:每个消费组独立消费同一个主题的消息,确保每个消费组都能接收到所有的消息
        消息保留:Kafka会根据配置的保留策略(如保留时间或日志段大小)保留消息
    c.常用API
        与点对点消费相同,使用KafkaConsumer和ConsumerConfig
    d.使用步骤
        创建多个消费组
        每个消费组使用KafkaConsumer订阅同一个主题
        每个消费组独立消费消息
    e.代码示例
        a.多个组独立消费一个topic
            // Group 1
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
            consumer1.subscribe(Collections.singletonList("topic1"));

            // Group 2
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
            KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
            consumer2.subscribe(Collections.singletonList("topic1"));

7.6 [5]rabbitmq:交换机(路由)+队列(存储消息)+消息确认机制

00.汇总
    a.一对一(点对点)
        描述: 一个生产者发送消息到一个队列,一个消费者从该队列中消费消息
        应用场景: 任务队列,如处理订单、日志记录
    b.一对多(发布/订阅)
        描述: 一个生产者发送消息到交换机,交换机将消息路由到多个绑定的队列,多个消费者各自从队列中消费消息
        应用场景: 广播消息,如实时通知、多租户消息分发
    c.主题(Topic)
        描述: 基于路由键的模式匹配,将消息路由到符合主题模式的队列
        应用场景: 日志系统中按模块或级别分发日志
    d.路由(Direct)
        direct 直连交换机:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式
        fanout 扇形交换机:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的
        topic 主题交换机:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
        headers 头交换机:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
    e.扇出(Fanout)
        描述: 交换机将收到的消息广播到所有绑定的队列,无视路由键
        应用场景: 广播通知、实时更新

01.一对一(点对点)
    a.定义
        一个生产者发送消息到一个队列,一个消费者从该队列中消费消息
        场景:任务队列,如处理订单、日志记录
    b.原理
        队列:消息被发送到一个特定的队列,只有一个消费者可以消费该队列中的消息
    c.常用API
        Channel.basicPublish:用于发送消息
        Channel.basicConsume:用于消费消息
    d.使用步骤
        创建一个队列
        生产者发送消息到该队列
        消费者从该队列消费消息
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare("queue_name", false, false, false, null);

            // 生产者发送消息
            String message = "Hello World!";
            channel.basicPublish("", "queue_name", null, message.getBytes());

            // 消费者消费消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received: " + receivedMessage);
            };
            channel.basicConsume("queue_name", true, deliverCallback, consumerTag -> {});
        }

02.一对多(发布/订阅)
    a.定义
        一个生产者发送消息到交换机,交换机将消息路由到多个绑定的队列,多个消费者各自从队列中消费消息
        场景:广播消息,如实时通知、多租户消息分发
    b.原理
        交换机:消息被发送到交换机,交换机根据绑定规则将消息路由到多个队列
    c.常用API
        Channel.exchangeDeclare:用于声明交换机
        Channel.queueBind:用于绑定队列到交换机
    d.使用步骤
        创建一个交换机
        绑定多个队列到该交换机
        生产者发送消息到交换机
        消费者从各自的队列消费消息
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare("exchange_name", "fanout");

            // 创建并绑定队列
            String queueName1 = channel.queueDeclare().getQueue();
            String queueName2 = channel.queueDeclare().getQueue();
            channel.queueBind(queueName1, "exchange_name", "");
            channel.queueBind(queueName2, "exchange_name", "");

            // 生产者发送消息
            String message = "Broadcast Message!";
            channel.basicPublish("exchange_name", "", null, message.getBytes());

            // 消费者1消费消息
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer 1 Received: " + receivedMessage);
            };
            channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {});

            // 消费者2消费消息
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer 2 Received: " + receivedMessage);
            };
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {});
        }

03.主题(Topic)
    a.定义
        基于路由键的模式匹配,将消息路由到符合主题模式的队列
        场景:日志系统中按模块或级别分发日志
    b.原理
        主题交换机:消息被发送到主题交换机,交换机根据路由键模式匹配将消息路由到队列
    c.常用API
        Channel.exchangeDeclare:用于声明主题交换机
        Channel.queueBind:用于绑定队列到交换机,使用路由键模式
    d.使用步骤
        创建一个主题交换机
        绑定队列到该交换机,使用路由键模式
        生产者发送消息到交换机,指定路由键
        消费者从符合路由键模式的队列消费消息
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare("topic_exchange", "topic");

            // 创建并绑定队列
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, "topic_exchange", "logs.*");

            // 生产者发送消息
            String message = "Log Message!";
            channel.basicPublish("topic_exchange", "logs.info", null, message.getBytes());

            // 消费者消费消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received: " + receivedMessage);
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        }

04.路由(Direct)
    a.定义
        交换机根据精确匹配的路由键将消息路由到队列
        场景:按类型或关键字分发任务
    b.原理
        直连交换机:消息被发送到直连交换机,交换机根据精确匹配的路由键将消息路由到队列
    c.常用API
        Channel.exchangeDeclare:用于声明直连交换机
        Channel.queueBind:用于绑定队列到交换机,使用精确路由键
    d.使用步骤
        创建一个直连交换机
        绑定队列到该交换机,使用精确路由键
        生产者发送消息到交换机,指定路由键
        消费者从符合路由键的队列消费消息
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare("direct_exchange", "direct");

            // 创建并绑定队列
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, "direct_exchange", "info");

            // 生产者发送消息
            String message = "Info Message!";
            channel.basicPublish("direct_exchange", "info", null, message.getBytes());

            // 消费者消费消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received: " + receivedMessage);
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        }

05.扇出(Fanout)
    a.定义
        交换机将收到的消息广播到所有绑定的队列,无视路由键
        场景:广播通知、实时更新
    b.原理
        扇出交换机:消息被发送到扇出交换机,交换机将消息广播到所有绑定的队列
    c.常用API
        Channel.exchangeDeclare:用于声明扇出交换机
        Channel.queueBind:用于绑定队列到交换机
    d.使用步骤
        创建一个扇出交换机
        绑定多个队列到该交换机
        生产者发送消息到交换机
        消费者从各自的队列消费消息
    e.代码示例
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare("fanout_exchange", "fanout");

            // 创建并绑定队列
            String queueName1 = channel.queueDeclare().getQueue();
            String queueName2 = channel.queueDeclare().getQueue();
            channel.queueBind(queueName1, "fanout_exchange", "");
            channel.queueBind(queueName2, "fanout_exchange", "");

            // 生产者发送消息
            String message = "Broadcast Message!";
            channel.basicPublish("fanout_exchange", "", null, message.getBytes());

            // 消费者1消费消息
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer 1 Received: " + receivedMessage);
            };
            channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {});

            // 消费者2消费消息
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer 2 Received: " + receivedMessage);
            };
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {});
        }

7.7 [5]es与db同步:6种

00.介绍
    a.图示
        方案       实时性   侵入性 复杂度  适用阶段
        同步双写    秒级    高     低     小型单体项目
        MQ异步     秒级    中     中     中型分布式系统
        Logstash  分钟级   无     低     离线分析
        Canal     毫秒级   无     高     高并发生产环境
        DataX     小时级   无     中     历史数据迁移
        Flink     毫秒级   低     极高   实时数仓
    b.推荐
        若团队无运维中间件能力 → 选择Logstash或同步双写
        需秒级延迟且允许改造 → MQ异步 + 本地事务表
        追求极致实时且资源充足 → Canal + Flink双保险

01.方案一:同步双写
    a.场景
        适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。在业务代码中同时写入MySQL与ES。
    b.代码示例
        ---
        @Transactional
        public void createOrder(Order order) {
            // 写入MySQL
            orderMapper.insert(order);
            // 同步写入ES
            IndexRequest request = new IndexRequest("orders")
                .id(order.getId())
                .source(JSON.toJSONString(order), XContentType.JSON);
            client.index(request, RequestOptions.DEFAULT);
        }
        ---
    c.痛点
        a.硬编码侵入
            所有涉及写操作的地方均需添加ES写入逻辑。
        b.性能瓶颈
            双写操作导致事务时间延长,TPS下降30%以上。
        c.数据一致性风险
            若ES写入失败,需引入补偿机制(如本地事务表+定时重试)。

02.方案二:异步双写
    a.场景
        电商订单状态更新后需同步至ES供客服系统检索。我们可以使用MQ进行解耦。
    b.代码示例
        a.生产者端
            ---
            public void updateProduct(Product product) {
                productMapper.update(product);
                kafkaTemplate.send("product-update", product.getId());
            }
            ---
        b.消费者端
            ---
            @KafkaListener(topics = "product-update")
            public void syncToEs(String productId) {
                Product product = productMapper.selectById(productId);
                esClient.index(product);
            }
            ---
    c.优势
        a.吞吐量提升
            通过MQ削峰填谷,可承载万级QPS。
        b.故障隔离
            ES宕机不影响主业务链路。
    d.缺陷
        a.消息堆积
            突发流量可能导致消费延迟(需监控Lag值)。
        b.顺序性问题
            需通过分区键保证同一数据的顺序消费。

03.方案三:Logstash定时拉取
    a.场景
        用户行为日志的T+1分析场景。该方案低侵入但高延迟。
    b.配置示例
        ---
        input {
            jdbc{
                jdbc_driver=>"com.mysql.jdbc.Driver"
                jdbc_url=>"jdbc:mysql://localhost:3306/log_db"
                schedule=>"*/5 * * * *"# 每5分钟执行
                statement=>"SELECT * FROM user_log WHERE update_time > :sql_last_value"
            }
        }
        output{
            elasticsearch{
                hosts=>["es-host:9200"]
                index=>"user_logs"
            }
        }
        ---
    c.适用性分析
        a.优点
            零代码改造,适合历史数据迁移。
        b.致命伤
            a.分钟级延迟
                无法满足实时搜索。
            b.全表扫描压力大
                需优化增量字段索引。

04.方案四:Canal监听Binlog
    a.场景
        社交平台动态实时搜索(如微博热搜更新)。
    b.技术栈
        Canal + RocketMQ + ES
    c.关键配置
        # canal.properties
        canal.instance.master.address=127.0.0.1:3306
        canal.mq.topic=canal.es.sync
    d.避坑指南
        a.数据漂移
            需处理DDL变更(通过Schema Registry管理映射)。
        b.幂等消费
            通过_id唯一键避免重复写入。

05.方案五:DataX批量同步
    a.场景
        将历史订单数据从分库分表MySQL迁移至ES。该方案是大数据迁移的首选。
    b.配置文件
        {
            "job":{
                "content":[{
                    "reader":{
                        "name":"mysqlreader",
                        "parameter":{"splitPk":"id","querySql":"SELECT * FROM orders"}
                    },
                    "writer":{
                        "name":"elasticsearchwriter",
                        "parameter":{"endpoint":"http://es-host:9200","index":"orders"}
                    }
                }]
            }
        }
    c.性能调优
        a.调整channel数提升并发
            建议与分片数对齐。
        b.启用limit分批查询避免OOM

06.方案六:Flink流处理
    a.场景
        商品价格变更时,需关联用户画像计算实时推荐评分。该方案适合于复杂的ETL场景。
    b.代码片段
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(new CanalSource())
            .map(record -> parseToPriceEvent(record))
            .keyBy(event -> event.getProductId())
            .connect(userProfileBroadcastStream)
            .process(new PriceRecommendationProcess())
            .addSink(new ElasticsearchSink());
    c.优势
        a.状态管理
            精准处理乱序事件(Watermark机制)。
        b.维表关联
            通过Broadcast State实现实时画像关联。

7.8 [5]es与db同步:异步、canal

00.介绍
    a.同步调用
        概念:在将数据写到MySQL时,同时将数据写到ES
        优点:实现简单粗暴
        缺点:业务耦合度高
    b.RabbitMQ异步队列
        概念:针对多数据源写入的场景,可以借助MQ实现异步的多源写入
        优点:低耦合,实现难度一般
        缺点:依赖mq的可靠性
    c.Canal+RabbitMQ可以实现接近实时的数据同步
        优点:完全解除服务间耦合
        缺点:开启binlog增加数据库负担、实现复杂度高

01.RabbitMQ异步队列
    a.介绍
        当数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作
        步骤:
        单机部署并启动MQ(单机部署在MQ部分有讲)
        接收者中声明exchange、queue、RoutingKey
        在hotel-admin发送者中的增、删、改业务中完成消息发送
        在hotel-demo接收者中完成消息监听,并更新elasticsearch中数据
        启动并测试数据同步功能
    b.对发送者和消费者都添加依赖和yaml信息
        a.引入依赖
            <!--amqp-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        b.yaml
            spring:
              rabbitmq: #MQ配置
                host: 192.168.194.131 # 主机名
                port: 5672 # 端口
                virtual-host: / # 虚拟主机
                username: itcast # 用户名
                password: 123321 # 密码
    c.声明交换机、队列
        a.声明队列交换机名称
            在hotel-admin发送者和hotel-demo消费者中的cn.itcast.hotel.constatnts包下新建一个类MqConstants
            package cn.itcast.hotel.constatnts;

            public class MqConstants {
                /**
                 * 交换机
                 */
                public final static String HOTEL_EXCHANGE = "hotel.topic";
                /**
                 * 监听新增和修改的队列
                 */
                public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
                /**
                 * 监听删除的队列
                 */
                public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
                /**
                 * 新增或修改的RoutingKey
                 */
                public final static String HOTEL_INSERT_KEY = "hotel.insert";
                /**
                 * 删除的RoutingKey
                 */
                public final static String HOTEL_DELETE_KEY = "hotel.delete";
            }
        b.声明队列交换机
            在hotel-demo消费者中,定义配置类,声明队列、交换机:
            package cn.itcast.hotel.config;

            import cn.itcast.hotel.constants.MqConstants;
            import org.springframework.amqp.core.Binding;
            import org.springframework.amqp.core.BindingBuilder;
            import org.springframework.amqp.core.Queue;
            import org.springframework.amqp.core.TopicExchange;
            import org.springframework.context.annotation.Bean;
            import org.springframework.context.annotation.Configuration;

            @Configuration
            public class MqConfig {
                @Bean
                public TopicExchange topicExchange(){
                    return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
                }

                @Bean
                public Queue insertQueue(){
                    return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
                }

                @Bean
                public Queue deleteQueue(){
                    return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
                }

                @Bean
                public Binding insertQueueBinding(){
                    return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
                }

                @Bean
                public Binding deleteQueueBinding(){
                    return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
                }
            }
    c.发送MQ消息
        在hotel-admin发送者中的增、删、改业务中分别发送MQ消息
        -----------------------------------------------------------------------------------------------------
        @PostMapping
        public void saveHotel(@RequestBody Hotel hotel){
            hotelService.save(hotel);
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HoTEL_INSERT_KEY, hotel.getId());
        }
        -----------------------------------------------------------------------------------------------------
        @PutMapping()
        public void updateById(@RequestBody Hotel hotel){
            if (hotel.getId() == null) {
                throw new InvalidParameterException("id不能为空");
            }
            hotelService.updateById(hotel);
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
        }
        -----------------------------------------------------------------------------------------------------
        @DeleteMapping("/{id}")
        public void deleteById(@PathVariable("id")Long id){
            hotelService.removeById(id);
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
        }
    d.接收MQ消息
        a.介绍
            hotel-demo接收到MQ消息要做的事情包括:
            新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
            删除消息:根据传递的hotel的id删除索引库中的一条数据
        b.写SDL业务
            首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、删除业务
            void deleteById(Long id);
            void insertById(Long id);
            -------------------------------------------------------------------------------------------------
            给hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中实现业务:
            @Override
            public void deleteById(Long id) {
                try {
                    // 1.准备Request
                    DeleteRequest request = new DeleteRequest("hotel", id.toString());
                    // 2.发送请求
                    client.delete(request, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public void insertById(Long id) {
                try {
                    // 0.根据id查询酒店数据
                    Hotel hotel = getById(id);
                    // 转换为文档类型
                    HotelDoc hotelDoc = new HotelDoc(hotel);

                    // 1.准备Request对象
                    IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
                    // 2.准备Json文档
                    request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
                    // 3.发送请求
                    client.index(request, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        c.编写监听器
            在hotel-demo中的cn.itcast.hotel.mq包新增一个类:
            package cn.itcast.hotel.mq;

            import cn.itcast.hotel.constants.MqConstants;
            import cn.itcast.hotel.service.IHotelService;
            import org.springframework.amqp.rabbit.annotation.RabbitListener;
            import org.springframework.beans.factory.annotation.Autowired;
            import org.springframework.stereotype.Component;

            @Component
            public class HotelListener {

                @Autowired
                private IHotelService hotelService;

                /**
                 * 监听酒店新增或修改的业务
                 * @param id 酒店id
                 */
                @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
                public void listenHotelInsertOrUpdate(Long id){
                    hotelService.insertById(id);
                }

                /**
                 * 监听酒店删除的业务
                 * @param id 酒店id
                 */
                @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
                public void listenHotelDelete(Long id){
                    hotelService.deleteById(id);
                }
            }
        d.测试
            用postman调用增加/删除/修改mysql数据库的接口,然后去页面搜索看看删除的数据还是否能查到,
            或者修改/增加的数据能不能查出来

02.RabbitMQ+Canal可以实现接近实时的数据同步
    a.Canal
        a.主从复制
            主库上的每一个事务操作(比如插入、删除、更新等)都会被记录到二进制日志(binary log)中
            binary log是一种二进制格式的日志文件,它包含了所有会修改数据的事件
            可以通过 SHOW BINLOG EVENTS进行查看
            从库就会去读取主库的binary log的日志,将其拷贝到自己的中继日志(relay log)
            然后从库执行 relay log中的事件,将数据变更反映为自己的数据。
        b.Canal原理
            1.Canal就是模拟MySQL主从的交互协议,伪装自己为Mysql SLAVE(从库),向MySQL主库发送dump协议
            2.MySQL主库收到dump请求,开始推送自己binary log给SLAVE(即Canal)
            3.Canal解析binary log对象(原始为byte流)
        c.Canal下载
            Canal是阿里巴巴开源的一个基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的组件
            下载地址为github.com/alibaba/can… 建议大家下载1.1.7的版本
    b.Canal配置
        a.第1步
            在开始进行Canal配置之前,我们需要确保MySQL的binary log日志是否开启,使用如下命令
            SHOW VARIABLES LIKE 'log_bin';
            SHOW VARIABLES LIKE 'binlog_format';
            SHOW VARIABLES LIKE 'log_bin';用于查看是否开启了 Binlog。如果返回值的Value列显示为ON,则表示 Binlog 已开启;如果是OFF,则表示未开启。
            SHOW VARIABLES LIKE 'binlog_format';用于查看当前 Binlog 的格式,返回值的Value列可能显示为STATEMENT、ROW或者MIXED。我们需要将格式设置为ROW
        b.第2步
            如果binary log没有开启的话,我们需要找到MySQL的配置文件,在Windows系统下为my.ini,Linux系统下为my.cnf,其路径通常在 C:\Users\Administrator\AppData\Roaming\Microsoft\Windows\Recent下
            在配置文件末尾加上以下内容:
            log-bin=mysql-bin # 开启 binlog
            binlog-format=ROW # 选择 ROW 模式
            server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
        c.第3步
            配置完成后,我们打开MySQL客户端,创建一个授权账号,供Canal连接使用,如果有账号则直接grant(不要使用root账号),完成后一定要重启数据库
            CREATE USER canal IDENTIFIED BY 'canal';
            GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
            -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
            -- 刷新权限
            FLUSH PRIVILEGES;
            我们来到\canal-deployer-1.1.7\conf\example\instance.properties
        d.第4步
            需要修改两个地方
            # position info  需要改成自己的数据库信息
            # 修改为自己的数据库地址和端口
            canal.instance.master.address=127.0.0.1:3306
            canal.instance.master.journal.name=DESKTOP-FL9JG86-bin.000091
            canal.instance.master.position=157
            canal.instance.master.timestamp=
            canal.instance.master.gtid=
            # username/password   需要改成自己的数据库信息  不能使用root账户
            canal.instance.dbUsername=canal   # 修改为刚刚授权的账户
            canal.instance.dbPassword=canal   # 密码
            canal.instance.connectionCharset = UTF-8
            # enable druid Decrypt database password
            canal.instance.enableDruid=false
            #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
        e.第5步
            对于canal.instance.master.journal.name和canal.instance.master.position两个参数的值,大家在MySQL客户端使用SHOW MASTER STATUS命令可以查看,比如:
            则canal.instance.master.journal.name值为DESKTOP-FL9JG86-bin.000091,canal.instance.master.position值为2671,修改完毕后保存。然后运行
            canal-deployer-1.1.7\bin\startup.bat程序,
            查看logs/canal/canal.log日志,结果如下,则启动完成
            2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
            2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
            2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
        f.第6步
            如果大家报错的话,是需要修改startup.bat文件的,因为里面的参数是使用Java的较低版本的参数,修改后内容如下,直接复制粘贴
            @echo off
            @if not "%ECHO%" == ""  echo %ECHO%
            @if "%OS%" == "Windows_NT"  setlocal
            set ENV_PATH=.\
            if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
            set conf_dir=%ENV_PATH%\..\conf
            set canal_conf=%conf_dir%\canal.properties
            @rem set canal_conf=%conf_dir%\canal_local.properties
            if "%1" == "local" set canal_conf=%conf_dir%\canal_local.properties
            set logback_configurationFile=%conf_dir%\logback.xml
            set CLASSPATH=%conf_dir%
            set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
            set JAVA_MEM_OPTS= -Xms128m -Xmx128m -XX:MaxMetaspaceSize=128m
            set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
            set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
            set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
            set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
            set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
            echo start cmd : %CMD_STR%
            java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
            然后重新启动,就没有问题了
    c.RabbitMQ配置
        a.配置
            接下来,我们来到/canal-deployer-1.1.7/conf/canal.properties,也是需要修改两个地方
            canal.zkServers =
            # flush data to zk
            canal.zookeeper.flush.period = 1000
            canal.withoutNetty = false
            # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ   可选项: tcp(默认), kafka,RocketMQ,rabbitmq,pulsarmq
            # 将此处修改为 rabbitMQ
            canal.serverMode = rabbitMQ
            # flush meta cursor/parse position to file
            ##################################################
            #########           RabbitMQ         #############
            ##################################################
            # 配置RabbitMQ
            rabbitmq.host = 127.0.0.1:5672
            rabbitmq.virtual.host = /
            rabbitmq.exchange = canalFanoutExchange
            rabbitmq.username = guest
            rabbitmq
        b.说明
            修改完成后保存,重新运行startup.bat,查看logs/canal/canal.log日志和logs/example/example.log,如果都没有报错就成功了
            如果大家出现java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = binlog truncated和Caused by:com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table错误
            直接删除canal/conf/下对应实例的meta.dat文件即可
    d.代码实战
        a.说明
            主数据的库操作都会被Canal监听,并把消息发送到RabbitMQ中
            那我们直接获取这个消息,就可以知道MySQL做了哪些事情(新增、修改、删除),那么es也需要做出相应的操作
        b.创建交换机
            @Bean
            public FanoutExchange canalFanoutExchange() {
                return new FanoutExchange("canalFanoutExchange", true, false);
            }

            @Bean
            public Queue canalQueue() {
                return new Queue("canalQueue", true);
            }


            @Bean
            public Binding canalBinding() {
                LOGGER.info("绑定canal交换机");
                return BindingBuilder.bind(canalQueue()).to(canalFanoutExchange());
            }
        c.监听canalQueue队列,即可获取到Canal推送到RabbitMQ的信息
             @RabbitListener(queuesToDeclare = @Queue("canalQueue"))
             public void receive3(@Payload String body, @Headers Map<String,Object> headers) {
                LOGGER.info("canalQueueBody:{}", body);
                LOGGER.info("canalQueueHeaders:{}", headers);
             }
        d.测试
            在数据库新增一条数据
            因为Canal发送的数据是byte类型的数组,所以这里接收到的数据也是byte类型的数组,使用代码对其进行解析
            可以看到data中的数据就是我们MySQL中新增的数据
    e.针对这个数据结构,我建立了一个实体类,用来映射这个JSON对象
        a.实体类
            import lombok.AllArgsConstructor;
            import lombok.Data;
            import lombok.NoArgsConstructor;

            import java.io.Serializable;
            import java.util.List;

            /**
             * ClassName: CanalEntity
             * Package: com.example.springbootrabbitmq.entity
             * Description:
             * 接受canal信息的实体类
             *
             * @Author ms
             * @Create 2024/11/6 12:07
             * @Version 1.0
             */
            @Data
            @AllArgsConstructor
            @NoArgsConstructor
            public class CanalEntity implements Serializable {

                private List<FileTableData> data;
                private String database;
                private long es;
                private int id;
                private boolean isDdl;
                private MysqlType mysqlType;
                private Object old;
                private List<String> pkNames;
                private String sql;
                private SqlType sqlType;
                private String table;
                private long ts;
                private String type;

                // 生成必要的 getter 和 setter 方法

                @Data
                public static class FileTableData {
                    private String id;
                    private String fileName;
                    private String fileType;
                    private String fileSize;
                    private String filePath;
                    private String isDeleted;

                }

                @Data
                public static class MysqlType {
                    private String id;
                    private String fileName;
                    private String fileType;
                    private String fileSize;
                    private String filePath;
                    private String isDeleted;

                }

                @Data
                public static class SqlType {
                    private int id;
                    private int fileName;
                    private int fileType;
                    private int fileSize;
                    private int filePath;
                    private int isDeleted;
                }
            }
        b.工具类
            import com.example.springbootrabbitmq.entity.CanalEntity;
            import com.fasterxml.jackson.databind.ObjectMapper;
            import org.springframework.util.Assert;

            import java.lang.reflect.Field;
            import java.lang.reflect.Type;
            import java.nio.charset.StandardCharsets;
            import java.util.*;

            /**
             * ClassName: ParseEntityUtils
             * Package: com.example.springbootrabbitmq.utils
             * Description:
             * 用于将Map结构的数据转换为实体类
             * 主要用于接受RabbitMq的消息
             *
             * @Author ms
             * @Create 2024/11/6 12:57
             * @Version 1.0
             */
            public class ParseEntityUtils {

                private static final List<String> OBJECT_FIELDS = Arrays.asList("data", "mysqlType", "pkNames", "sqlType");


                /**
                 * 将byte结构的数据转换为实体类
                 *
                 * @param messageBody 字节数据
                 * @param clazz       实体的Class对象
                 * @param <T>
                 * @return
                 */
                public static <T> T parseEntity(byte[] messageBody, Class<T> clazz) {
                    // 1.将字节数组转换为字符串
                    String jsonString = new String(messageBody, StandardCharsets.UTF_8);
                    // 2.使用JackSon库将String转换为Map结构
                    ObjectMapper mapper = new ObjectMapper();
                    // 3.将String转换为Map结构
                    // todo: ObjectMapper其实提供了将String转换为实体类的方式,这里无法直接使用,是应为RabbitMQ返回的数据读取完成后是多层嵌套的对象,
                    //  所以我封装了内部类,而ObjectMapper是无法直接映射内部类的
                    T t = null;

                    Map<String, Object> map;
                    try {
                        map = mapper.readValue(jsonString, Map.class);

                        t = clazz.getDeclaredConstructor().newInstance();

                        Class<?>[] innerClasss = clazz.getDeclaredClasses();

                        // 获得当前类的所有字段  去除 data、MySqlType、pkNames字段,其他的字段类型都是基本数据类型或String类型,直接赋值
                        Field[] fields = clazz.getDeclaredFields();
                        for (Field field : fields) {
                            // 获得字段的名称
                            String fieldName = field.getName();
                            if (!OBJECT_FIELDS.contains(fieldName)) {
                                // 直接赋值
                                field.setAccessible(true);
                                field.set(t, map.get(fieldName));
                            }

                            // 对于其他类型,比如List、Map类型,额外处理
                            if (List.class.isAssignableFrom(field.getType())) {
                                // 获取List结构中的数据类型
                                Class<?> elementType = findListElementType(field);
                                Assert.notNull(elementType, "List结构中的数据类型不能为空");
                                // 获取clazz的内部类

                                for (Class<?> innerClass : innerClasss) {
                                    if (elementType.getSimpleName().equals(innerClass.getSimpleName())) {
                                        // 对innerClass对象赋值
                                        Object innerObject = innerClass.getDeclaredConstructor().newInstance();

                                        ArrayList<Map<String, Object>> innerValue = (ArrayList<Map<String, Object>>) map.get(replaceFieldToLower(fieldName));
                                        ArrayList<Object> list = new ArrayList<>();
                                        for (Map<String, Object> objectMap : innerValue) {
                                            HashMap<String, Object> hashMap = new HashMap<>();
                                            for (Map.Entry<String, Object> entry : objectMap.entrySet()) {
                                                hashMap.put(replaceFieldToUpper(entry.getKey()), entry.getValue());
                                            }
                                            String str = mapper.writeValueAsString(hashMap);
                                            Object object = mapper.readValue(str, innerClass);
                                            list.add(object);
                                        }
                                        field.setAccessible(true);
                                        field.set(t, list);
                                    }
                                }
                            } else if (Map.class.isAssignableFrom(field.getType())) {
            //                    // 如果是字段类型为Map
            //                    field.setAccessible(true);
            //                    field.set(t, map.get(fieldName));
                            } else if (CanalEntity.MysqlType.class.isAssignableFrom(field.getType()) || CanalEntity.SqlType.class.isAssignableFrom(field.getType())) {
                                // 最后表示字段类型为Object类型
                                Class<?> aClass = field.getType();
                                // 获得所以字段
                                Field[] innerFields = aClass.getDeclaredFields();
                                // 获得字段的值
                                HashMap<String, Object> innerFieldValue = (HashMap<String, Object>) map.get(field.getName());
                                HashMap<String, Object> hashMap = new HashMap<>();

                                for (Map.Entry<String, Object> entry : innerFieldValue.entrySet()) {
                                    if (entry.getKey().contains("_")) {
                                        hashMap.put(replaceFieldToUpper(entry.getKey()), entry.getValue());
                                    } else {
                                        hashMap.put(entry.getKey(), entry.getValue());
                                    }
                                }
                                String string = mapper.writeValueAsString(hashMap);
                                Object value = mapper.readValue(string, aClass);
                                field.setAccessible(true);
                                field.set(t, value);
                            }
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                    return t;
                }


                /**
                 * 将驼峰命名法替换为 _   例如:userName -> user_name
                 *
                 * @param fieldName
                 * @return
                 */
                public static String replaceFieldToLower(String fieldName) {
                    StringBuilder result = new StringBuilder();
                    for (int i = 0; i < fieldName.length(); i++) {
                        char c = fieldName.charAt(i);
                        if (Character.isUpperCase(c) && i > 0) {
                            result.append("_");
                        }
                        result.append(Character.toLowerCase(c));
                    }
                    return result.toString();
                }


                /**
                 * 将 _ 替换为驼峰命名法 例如:user_name -> userName
                 *
                 * @param fieldName
                 * @return
                 */
                public static String replaceFieldToUpper(String fieldName) {
                    StringBuilder result = new StringBuilder();
                    boolean nextToUpperCase = false;
                    for (char c : fieldName.toCharArray()) {
                        if (c == '_') {
                            nextToUpperCase = true;
                        } else {
                            if (nextToUpperCase) {
                                result.append(Character.toUpperCase(c));
                                nextToUpperCase = false;
                            } else {
                                result.append(c);
                            }
                        }
                    }
                    return result.toString();
                }


                /**
                 * 获取List结构中的数据类型
                 *
                 * @param field
                 * @return
                 */
                public static Class<?> findListElementType(Field field) {
                    Type genericType = field.getGenericType();
                    if (genericType instanceof java.lang.reflect.ParameterizedType) {
                        java.lang.reflect.ParameterizedType parameterizedType = (java.lang.reflect.ParameterizedType) genericType;
                        Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                        if (actualTypeArguments.length > 0) {
                            if (actualTypeArguments[0] instanceof Class<?>) {
                                return (Class<?>) actualTypeArguments[0];
                            }
                        }
                    }
                    return null;
                }
            }
        c.调用工具类解析JSON即可,完整方法
            @Test
            public void contextLoads() {
                byte[] byteData = new byte[]{123,34,100,97,116,97,34,58,91,123,34,105,100,34,58,34,49,51,34,44,34,102,105,108,101,95,110,97,109,101,34,58,34,100,97,106,119,100,106,97,119,34,44,34,102,105,108,101,95,116,121,112,101,34,58,34,100,111,99,120,34,44,34,102,105,108,101,95,115,105,122,101,34,58,34,49,53,57,48,34,44,34,102,105,108,101,95,112,97,116,104,34,58,34,49,34,44,34,105,115,95,100,101,108,101,116,101,100,34,58,34,48,34,125,93,44,34,100,97,116,97,98,97,115,101,34,58,34,100,98,55,34,44,34,101,115,34,58,49,55,51,48,56,56,54,57,48,49,48,48,48,44,34,105,100,34,58,57,44,34,105,115,68,100,108,34,58,102,97,108,115,101,44,34,109,121,115,113,108,84,121,112,101,34,58,123,34,105,100,34,58,34,105,110,116,34,44,34,102,105,108,101,95,110,97,109,101,34,58,34,118,97,114,99,104,97,114,40,50,53,53,41,34,44,34,102,105,108,101,95,116,121,112,101,34,58,34,118,97,114,99,104,97,114,40,53,48,41,34,44,34,102,105,108,101,95,115,105,122,101,34,58,34,98,105,103,105,110,116,34,44,34,102,105,108,101,95,112,97,116,104,34,58,34,116,101,120,116,34,44,34,105,115,95,100,101,108,101,116,101,100,34,58,34,116,105,110,121,105,110,116,40,49,41,34,125,44,34,111,108,100,34,58,110,117,108,108,44,34,112,107,78,97,109,101,115,34,58,91,34,105,100,34,93,44,34,115,113,108,34,58,34,34,44,34,115,113,108,84,121,112,101,34,58,123,34,105,100,34,58,52,44,34,102,105,108,101,95,110,97,109,101,34,58,49,50,44,34,102,105,108,101,95,116,121,112,101,34,58,49,50,44,34,102,105,108,101,95,115,105,122,101,34,58,45,53,44,34,102,105,108,101,95,112,97,116,104,34,58,50,48,48,53,44,34,105,115,95,100,101,108,101,116,101,100,34,58,45,54,125,44,34,116,97,98,108,101,34,58,34,102,105,108,101,95,116,97,98,108,101,34,44,34,116,115,34,58,49,55,51,48,56,56,54,57,48,49,57,52,51,44,34,116,121,112,101,34,58,34,73,78,83,69,82,84,34,125};
                String jsonString = new String(byteData, StandardCharsets.UTF_8);
                ObjectMapper mapper = new ObjectMapper();
                JsonNode map = null;
                try {
                    map = mapper.readTree(jsonString);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

                // 打印解析后的 JSON 对象
                System.out.println("解析后的 JSON 对象:" + map.toString());


                CanalEntity canalEntity = ParseEntityUtils.parseEntity(byteData, CanalEntity.class);
                System.out.println(canalEntity.toString());

            }
        d.结果
            从CanalEntity中取出FileTableData对象的信息
            然后调用ElasticSearch的API执行插入操作即可,至于修改、删除其实也是同样的思路

8 延迟消息队列

8.1 汇总:6个

01.DelayQueue
    JDK自带

02.MQ框架
    a.方式1
        DLX(死信交换机)+TTL(消息超时时间)
    b.方式2
        rabbitmq_delayed_message_exchange 插件 实现延迟队列

03.Redis中间件
    a.Redis过期消息
        Redis 允许我们为每一个 key 设置过期时间,在 key 过期时,Redis 可以配置为发送一个过期事件
        在应用程序通过监听这个过期事件,就可以实现延迟队列了
    b.zset的score属性
        利用zset的score属性,redis会将zset集合中的元素按照score进行从小到大排序,通过zadd命令向zset中添加元素
        其中value值为延时任务消息,可根据业务定义消息格式,score值为任务执行的时间点,比如13位毫秒时间戳
        任务添加后,获取任务的逻辑只需从zset中筛选score值小于当前时间戳的元素,
        所得结果便是当前时间节点下需要执行的任务,通过zrangebyscore命令来获取,zset拥有集合去重的特性
    c.Redisson延迟队列:RDelayedQueue
        Redisson阻塞队列和延迟队列

8.2 [1]DelayQueue:JDK自带

01.原理
    a.延迟机制
        DelayQueue中的元素必须实现Delayed接口,该接口要求实现getDelay(TimeUnit unit)方法,用于指定元素的延迟时间
    b.队列特性
        DelayQueue是一个无界阻塞队列,只有当元素的延迟时间到期后,才能从队列中取出进行消费
    c.时间控制
        通过System.nanoTime()来计算元素的剩余延迟时间

02.常用API
    offer(E e):将元素插入队列
    poll():获取并移除队列头部元素,如果没有元素到期则返回null
    take():获取并移除队列头部元素,等待元素到期
    peek():获取队列头部元素但不移除,如果没有元素到期则返回null

03.使用步骤
    实现Delayed接口:创建一个类实现Delayed接口,定义元素的延迟时间
    创建DelayQueue:实例化一个DelayQueue对象
    插入元素:使用offer()方法将实现了Delayed接口的元素插入队列
    消费元素:使用take()方法阻塞等待元素到期并消费

04.代码示例
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;

    class DelayedElement implements Delayed {
        private final long delayTime; // 延迟时间
        private final long expireTime; // 到期时间
        private final String message;

        public DelayedElement(long delay, String message) {
            this.delayTime = delay;
            this.expireTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS);
            this.message = message;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.expireTime < ((DelayedElement) o).expireTime) {
                return -1;
            }
            if (this.expireTime > ((DelayedElement) o).expireTime) {
                return 1;
            }
            return 0;
        }

        public String getMessage() {
            return message;
        }
    }

    public class DelayQueueExample {
        public static void main(String[] args) throws InterruptedException {
            DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();

            // 插入元素
            delayQueue.offer(new DelayedElement(1000, "Message 1"));
            delayQueue.offer(new DelayedElement(2000, "Message 2"));
            delayQueue.offer(new DelayedElement(3000, "Message 3"));

            // 消费元素
            while (!delayQueue.isEmpty()) {
                DelayedElement element = delayQueue.take(); // 阻塞等待元素到期
                System.out.println("Consumed: " + element.getMessage());
            }
        }
    }

8.3 [2]DLX+TTL:定义

01.定义
    a.DLX(死信交换机)
        a.定义
            一种特殊的交换机,用于处理无法被正常消费的消息(死信)
        b.消息成为死信的原因
            消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
            消息过期
            队列达到最大长度
    b.TTL(消息超时时间)
        a.定义
            消息在队列中存活的时间,超过该时间后消息会被转发到死信交换机

02.原理
    消息处理:当消息在队列中达到TTL(超时时间)或因其他原因成为死信时,会被转发到绑定的死信交换机
    死信交换机:死信交换机将死信路由到指定的死信队列,供消费者处理

03.常用API
    exchangeDeclare:声明交换机
    queueDeclare:声明队列
    queueBind:绑定队列和交换机
    basicPublish:发送消息

04.使用步骤
    1.创建连接工厂,设置连接属性
    2.从连接工厂中获取
    3.从连接中打开通道channel
    4.1.声明死信交换机、死信队列、绑定死信队列和死信交换机
    4.2.声明普通交换机、声明普通队列,设置TTL和DLX、绑定普通队列和普通交换机
    5.通过channel发送消息

05.示例
    a.代码
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;

        import java.util.HashMap;
        import java.util.Map;

        public class DLXExample {

            public static void main(String[] args) throws Exception {
                // 1. 创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setUsername("guest");
                factory.setPassword("guest");

                // 2. 获取连接
                try (Connection connection = factory.newConnection()) {
                    // 3. 打开通道
                    Channel channel = connection.createChannel();

                    // 4.1 声明死信交换机、死信队列、绑定死信队列和死信交换机
                    channel.exchangeDeclare("dlx-exchange", "direct");
                    channel.queueDeclare("dlx-queue", true, false, false, null);
                    channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

                    // 4.2 声明普通队列,设置TTL和DLX
                    Map<String, Object> arguments = new HashMap<>();
                    arguments.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
                    arguments.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机
                    arguments.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信路由键
                    channel.queueDeclare("simple-queue3", true, false, false, arguments);

                    // 5. 发送消息
                    String message = "Hello, World!";
                    channel.basicPublish("", "simple-queue3", null, message.getBytes());
                    System.out.println("Sent: " + message);
                }
            }
        }
    b.体现(生产者)
        // 4.1
        // 声明死信交换机
        channel.exchangeDeclare("dlx-exchange", "direct");
        // 声明死信队列
        channel.queueDeclare("dlx-queue", true, false, false, null);
        // 绑定死信队列和死信交换机
        channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

        // 4.2
        // 声明普通交换机
        // 使用默认 (AMQP Default)
        // 声明普通队列,设置TTL和DLX
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
        map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机                      体现
        map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key        体现
        channel.queueDeclare("simple-queue3", true, false, false, map);
        // 绑定普通队列和普通交换机
        // 无需绑定

8.4 [2]DLX+TTL:队头阻塞

00.RabbitMQ延迟消息的队头阻塞
    在使用死信队列(DLX)和 TTL(消息过期时间)实现延迟消息时
    由于队列的先进先出(FIFO)特性,在队列头部消息未过期的情况下,即使后续消息已经过期也不能及时处理的情况

01.原因
    a.先进先出的队列特性
        队列中的消息必须按顺序处理,即使后面的消息 TTL 较短且已过期,也必须等待队头的消息先被处理(或过期)
    b.TTL 检查机制
        RabbitMQ 默认仅在处理队头消息时检查其 TTL,如果队头消息的 TTL 较长(例如 10 分钟)
        即使后续消息的 TTL 更短(例如 1 分钟),这些消息也会被阻塞,直到队头消息过期或被移除

02.解决
    a.为不同延迟时间创建独立队列
        将相同 TTL 的消息放入同一队列,避免消息的过期时间不一致
    b.使用延迟插件
        使用 RabbitMQ 的延迟插件 rabbitmq_delayed_message_exchange
        直接通过延迟交换机实现延迟消息,绕过死信队列的 FIFO 限制
        延迟插件是通过将消息存储到内置数据库 Mnesia,再通过不断判断过期消息
        实现延迟消息的投递和执行的,因此它不存在队列的先进先出和队头阻塞的问题

03.死信队列+TTL
    a.实现流程
        生产者先将设置了TTL(过期时间)的消息发送到普通队列
        普通队列没有消息者,所以一定会过期,消息过期之后就会发送到死信队列
        消费者订阅死信队列获取消息,并执行延迟任务。
    b.定义死信交换器(DLX)和死信队列
        // Spring Boot 配置示例
        @Configuration
        public class RabbitConfig {
            // 定义死信交换器
            @Bean
            public DirectExchange dlxExchange() {
                return new DirectExchange("dlx.exchange");
            }

            // 定义死信队列
            @Bean
            public Queue dlxQueue() {
                return new Queue("dlx.queue");
            }

            // 绑定死信队列到 DLX
            @Bean
            public Binding dlxBinding() {
                return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");
            }

            // 定义普通队列,设置死信交换器和路由键
            @Bean
            public Queue mainQueue() {
                Map<String, Object> args = new HashMap<>();
                args.put("x-dead-letter-exchange", "dlx.exchange");
                args.put("x-dead-letter-routing-key", "dlx.routing.key");
                // 可选:设置队列级别的 TTL(所有消息统一过期时间)
                args.put("x-message-ttl", 10000); // 10秒
                return new Queue("main.queue", true, false, false, args);
            }

            // 主队列绑定到默认交换器(根据需要调整)
            @Bean
            public Binding mainBinding() {
                return BindingBuilder.bind(mainQueue()).to(new DirectExchange("default.exchange")).with("main.routing.key");
            }
        }
    c.发送消息时设置 TTL(消息级别)
        // 发送延迟消息(消息级别 TTL)
        public void sendDelayedMessage(String message, int delayMs) {
            rabbitTemplate.convertAndSend("default.exchange", "main.routing.key", message, msg -> {
                // 设置消息过期时间(覆盖队列级别的 TTL)
                msg.getMessageProperties().setExpiration(String.valueOf(delayMs));
                return msg;
            });
        }
    d.消费者监听死信队列
        @RabbitListener(queues = "dlx.queue")
            public void handleDelayedMessage(String message) {
            System.out.println("处理延迟消息: " + message);
        }
    e.消息的过期时间TTL的设置方式有以下两种
        a.队列级别:通过设置队列的 x-message-ttl 参数,设置队列统一的过期时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) // 消息持久化
                    .expiration("60000") // 设置消息过期时间为 60 秒
                    .build();
            channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
        b.消息级别:通过给每个消息设置 expiration 属性,为每个消息设置过期时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) // 消息持久化
                    .expiration("60000") // 设置消息过期时间为 60 秒
                    .build();
            channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
            如果同时设置了消息级 TTL 和队列级 TTL,消息的实际过期时间会取两者中的最小值

8.5 [2]DLX+TTL:SpringBoot实现

01.配置
    a.导入依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
    b.yml配置
        server:
          port: 8081
        spring:
          application:
            name: RabbitMQ-Demo
          rabbitmq:
            username: guest
            password: guest
            host: 127.0.0.1
            port: 5672
            virtual-host: /
            listener:
              simple:
                acknowledge-mode: manual

02.创建交换机、队列以及他们之间的绑定关系
    a.说明
        创建了两个队列和一个死信队列,其中两个队列一个指定了队列的过期时间
        另一个未指定队列的过期时间,但是会在后续发送消息到该队列时指定消息的过期时间
        注:如果队列的过期时间和消息的过期时间都指定了,则取小的那个值
    b.代码
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.amqp.core.*;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import java.util.HashMap;
        import java.util.Map;
        @Slf4j
        @Configuration
        public class RabbitMQConfig {
            public static final String X_CHANGE = "x-change";
            public static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
            public static final String QUEUE_A = "queue-a";
            public static final String ROUTING_KEY_A = "routing-key-a";
            public static final String QUEUE_B = "queue-b";
            public static final String ROUTING_KEY_B = "routing-key-b";
            public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";
            public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter-routing-key";
            @Bean
            public DirectExchange xChange() {
                return new DirectExchange(X_CHANGE);
            }
            @Bean
            public DirectExchange xDeadLetterExchange() {
                return new DirectExchange(X_DEAD_LETTER_EXCHANGE);
            }
            @Bean
            public Queue queueA() {
                Map<String, Object> args = new HashMap<>();
                args.put("x-dead-letter-exchange", X_DEAD_LETTER_EXCHANGE);
                args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
                args.put("x-message-ttl", 2000);
                return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
            }
            @Bean
            public Queue queueB() {
                HashMap<String, Object> args = new HashMap<>();
                args.put("x-dead-letter-exchange", X_DEAD_LETTER_EXCHANGE);
                args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
                return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
            }
            @Bean
            public Queue deadLetterQueue() {
                return new Queue(DEAD_LETTER_QUEUE);
            }
            @Bean
            public Binding bindingA() {
                return BindingBuilder.bind(queueA()).to(xChange()).with(ROUTING_KEY_A);
            }
            @Bean
            public Binding bindingB() {
                return BindingBuilder.bind(queueB()).to(xChange()).with(ROUTING_KEY_B);
            }
            @Bean
            public Binding bindingDeadLetterQueue() {
                return BindingBuilder.bind(deadLetterQueue()).to(xDeadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
            }
        }

03.生产者、消费者
    a.生产者:模拟消息发送
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.GetResponse;
        import com.rabbitmqdemo.config.RabbitMQConfig;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.amqp.rabbit.connection.ConnectionFactory;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RestController;
        import java.io.IOException;
        import java.nio.charset.StandardCharsets;
        import java.time.LocalDateTime;
        import java.util.concurrent.TimeoutException;
        @Slf4j
        @RestController
        @RequestMapping("/mq")
        public class MQTestController {
            @Autowired
            private RabbitTemplate rabbitTemplate;
            @Autowired
            private ConnectionFactory connectionFactory;
            @GetMapping
            public void send() {
                log.info("发送消息时间:{}", LocalDateTime.now());
                rabbitTemplate.convertAndSend(RabbitMQConfig.X_CHANGE, RabbitMQConfig.ROUTING_KEY_A, "队列A:消息。。。。");
                rabbitTemplate.convertAndSend(RabbitMQConfig.X_CHANGE, RabbitMQConfig.ROUTING_KEY_B, "队列B:消息。。。。", message -> {
                    message.getMessageProperties().setExpiration("10000");
                    return message;
                });
                rabbitTemplate.convertAndSend(RabbitMQConfig.X_CHANGE, RabbitMQConfig.ROUTING_KEY_B, "队列C:消息。。。。", message -> {
                    message.getMessageProperties().setExpiration("10000");
                    return message;
                });
            }
            @GetMapping("/cancel")
            public void cancel() throws IOException, TimeoutException {
                log.info("取消消息");
                Channel channel = connectionFactory.createConnection().createChannel(false);
                GetResponse response = channel.basicGet(RabbitMQConfig.QUEUE_B, false);
                boolean isDelete = false;
                while (!isDelete) {
                    if (response == null) {
                        break;
                    }
                    String messageValue = new String(response.getBody(), StandardCharsets.UTF_8);
                    if (messageValue.contains("B")) {
                        isDelete = true;
                        channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                        continue;
                    }
                    channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
                }
                channel.close();
            }
        }
    b.消费者:创建队列的监听器
        import com.rabbitmq.client.Channel;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.rabbit.annotation.RabbitListener;
        import org.springframework.stereotype.Component;
        import java.time.LocalDateTime;
        @Slf4j
        @Component
        public class RabbitMQListener {
            @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
            public void deadLetterQueue(Message message, Channel channel) {
                log.info("死信队列接收数据时间:{}", LocalDateTime.now());
                String s = new String(message.getBody());
                System.out.println("死信队列收到消息:" + s);
                try {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    c.执行结果
        可以看到中间那条被取消的消息并没有被监听到
        从而实现某个任务或者订单在一段时间内未执行指定操作则进入死信队列执行逻辑
        若执行了指定操作则不让他执行那段逻辑

8.6 [2]rabbitmq_delayed_message_exchange插件

01.定义
    一种通过插件实现的延迟消息机制,允许消息在指定的延迟时间后被路由到目标队列

02.原理
    延迟交换机:使用x-delayed-message类型的交换机,通过消息头中的x-delay属性指定消息的延迟时间
    消息路由:消息在延迟时间到期后被路由到绑定的队列进行消费

03.常用API
    exchangeDeclare:声明延迟交换机
    queueDeclare:声明队列
    queueBind:绑定队列和延迟交换机
    basicPublish:发送消息,并设置延迟时间

04.使用步骤
    1.安装插件:确保RabbitMQ安装了rabbitmq_delayed_message_exchange插件并启用
    2.创建连接工厂:设置连接属性
    3.获取连接:从连接工厂中获取连接
    4.打开通道:从连接中打开通道
    5.1.声明x-delayed-message类型的延迟交换机
    5.2.声明队列并绑定到延迟交换机
    6.发送消息:通过通道发送消息,并设置延迟时间

05.示例
    a.代码
        import com.rabbitmq.client.AMQP;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;

        import java.util.HashMap;

        public class DelayedMessageExample {

            public static void main(String[] args) throws Exception {
                // 1. 创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setUsername("guest");
                factory.setPassword("guest");

                // 2. 获取连接
                try (Connection connection = factory.newConnection()) {
                    // 3. 打开通道
                    Channel channel = connection.createChannel();

                    // 4. 声明延迟交换机
                    HashMap<String, Object> argsMap = new HashMap<>();
                    argsMap.put("x-delayed-type", "direct");
                    channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, argsMap);

                    // 声明队列并绑定到延迟交换机
                    channel.queueDeclare("delayed_queue", true, false, false, null);
                    channel.queueBind("delayed_queue", "delayed_exchange", "");

                    // 5. 准备发送消息的内容
                    String message = "你好,消息队列!!! 5000毫秒";
                    int delay = 5000; // 设置延迟时间(例如,5000毫秒)

                    // 6. 发送消息给延迟交换机
                    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                            .headers(new HashMap<String, Object>() {{
                                put("x-delay", delay);
                            }})
                            .build();
                    channel.basicPublish("delayed_exchange", "", props, message.getBytes());
                    System.out.println("消息发送成功!");
                }
            }
        }
    b.体现(生产者)
        // 4: 声明延迟交换机
        channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);      体现
        channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
        channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

        // 5: 准备发送消息的内容
        String message = "你好,消息队列!!! 5000毫秒";
        // 设置延迟时间(例如,5000毫秒)
        int delay = 5000;
        // 6: 发送消息给延迟交换机
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
                .build();
        channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
        System.out.println("消息发送成功!");

8.7 [3]redis过期消息:极其不稳定

01.利用Redis过期消息实现延时队列(极其不稳定)
    a.思路
        设置过期时间: 将消息存入 Redis 时,设置一个过期时间
        监听过期事件: 通过 Redis 的发布/订阅(Pub/Sub)机制或通过 keyspace notifications 监听过期事件
        处理消息: 当监听到某个键的过期事件时,取出该消息进行处理
    b.步骤
        开启 Keyspace Notifications: 首先需要在 Redis 配置中开启 keyspace notifications。可以在 Redis 配置文件中设置:notify-keyspace-events Ex
        设置过期消息: 使用 Redis 存储消息并设置过期时间
        监听过期事件: 在应用程序中通过订阅 Redis 频道来监听过期事件
    c.总结
        开启过期键通知,当 Redis 中键值过期时触发时间
        在事件中实现延迟代码,但因为 Redis 的 Key 过期时不会被及时删除
        所以这个过期事件也不保证可以立即触发,所以此方式很少用来实现延迟任务(因为极其不稳定)

02.示例
    a.代码
        package com.ruoyi.redis4.demo06;

        import redis.clients.jedis.Jedis;
        import redis.clients.jedis.JedisPubSub;

        // 利用 Redis 过期消息实现延时队列
        public class Demo04 {

            private static final String QUEUE_PREFIX = "delayedQueue:";
            private static final String EXPIRED_CHANNEL = "__keyevent@0__:expired";

            public static void main(String[] args) {
                // Start a thread to listen for expired messages
                new Thread(Demo04::listenForExpiredMessages).start();

                // Add a delayed message
                addDelayedMessage("task1", 5); // 5 seconds delay
                addDelayedMessage("task2", 10); // 10 seconds delay
            }

            public static void addDelayedMessage(String message, int delayInSeconds) {
                try (Jedis jedis = new Jedis("localhost", 6379)) {
                    jedis.setex(QUEUE_PREFIX + message, delayInSeconds, message);
                    System.out.println("Added message: " + message + " with delay: " + delayInSeconds + " seconds");
                }
            }

            public static void listenForExpiredMessages() {
                try (Jedis jedis = new Jedis("localhost", 6379)) {
                    jedis.subscribe(new JedisPubSub() {
                        @Override
                        public void onMessage(String channel, String message) {
                            // This will be triggered when a key expires
                            System.out.println("Received expired message: " + message);
                            // Here you can add the logic to process the message
                        }
                    }, EXPIRED_CHANNEL);
                }
            }
        }
    b.说明
        设置过期时间: 将消息存入 Redis 时,设置一个过期时间
        监听过期事件: 通过 Redis 的发布/订阅(Pub/Sub)机制或通过 keyspace notifications 监听过期事件
        处理消息: 当监听到某个键的过期事件时,取出该消息进行处理

8.8 [3]zset的score属性:使用一个后台任务定期检查zset

01.使用zSet实现延时队列(使用一个后台任务(如定时任务或守护进程)定期检查 ZSET)
    a.原理
        利用zset的score属性,redis会将zset集合中的元素按照score进行从小到大排序,通过zadd命令向zset中添加元素
    b.如下述命令所示
        其中value值为延时任务消息,可根据业务定义消息格式,score值为任务执行的时间点,比如13位毫秒时间戳
        zadd delayqueue 1614608094000 taskinfo
    c.任务添加后,获取任务的逻辑只需从zset中筛选score值小于当前时间戳的元素,所得结果便是当前时间节点下需要执行的任务,通过zrangebyscore命令来获取
        如下述命令所示,其中timestamp为当前时间戳,可用limit限制每次拉取的记录数,防止单次获取记录数过大
        #获取score介于0和timestamp之间的
        zrangebyscore delayqueue 0 timestamp之间的 limit 0 1000
    d.在实际实现过程中
        从zset中获取到当前需要执行的任务后,需要先确保将任务对应的元素从zset中删除
        删除成功后才允许执行任务逻辑,这样是为了在分布式环境下,当存在多个线程获取到同一任务后
        利用redis删除操作的原子性,确保只有一个线程能够删除成功并执行任务,防止重复执行
    e.采用zset做延时队列的另一个好处
        可以实现任务的取消和任务执行时间点的更改,只需要将任务信息从zset中删除,便可取消任务
        同时由于zset拥有集合去重的特性,只需再次写入同一个任务信息,但是value值设置为不同的执行时间点
        便可更改任务执行时间,实现单个任务执行时间的动态调整
    f.总结
        redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score
        将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min)
        任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小
        (即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查
        直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可
    g.总结2
        可以将任务及其执行时间作为成员和分数存储在 ZSET 中
        然后,使用一个后台任务(如定时任务或守护进程)定期检查 ZSET
        查找分数(即执行时间)小于或等于当前时间的成员,并执行相应的任务。执行后,从 ZSET 中删除该成员

02.示例
    a.代码
        package com.ruoyi.redis4.demo06;

        import java.io.Serializable;
        import java.net.InetAddress;
        import java.net.UnknownHostException;
        import java.text.SimpleDateFormat;
        import java.util.ArrayList;
        import java.util.Date;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
        import java.util.Timer;
        import java.util.TimerTask;

        public class Demo05 {

            public static void main(String[] args) {
                DelayQueue delayQueue = new DelayQueue();

                // 模拟添加延时消息
                long currentTime = System.currentTimeMillis();
                System.out.println("当前时间戳:" + currentTime);
                delayQueue.put(new DelayMessage("1", "westbrook", currentTime + 10000));
                delayQueue.put(new DelayMessage("0", "love", currentTime + 10000));
                delayQueue.put(new DelayMessage("6", "brook", currentTime + 10000));
                delayQueue.put(new DelayMessage("7", "wade", currentTime + 10000));
                delayQueue.put(new DelayMessage("8", "lebron", currentTime + 10000));
                delayQueue.put(new DelayMessage("2", "james", currentTime + 20000));
                delayQueue.put(new DelayMessage("3", "kobe", currentTime + 30000));
                delayQueue.put(new DelayMessage("4", "curry", currentTime + 40000));
                delayQueue.put(new DelayMessage("9", "durant", currentTime + 50000));
                delayQueue.put(new DelayMessage("10", "paul", currentTime + 50000));
                delayQueue.put(new DelayMessage("5", "durant", currentTime + 50000));

                // 启动消息处理
                DelayMessageHandler handler = new DelayMessageHandler(delayQueue);
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        handler.handleExpiredMessages();
                    }
                }, 0, 1000);
            }

            public static class DelayMessage implements Serializable {
                private static final long serialVersionUID = -7671756385477179547L;
                private String id;
                private String content;
                private long expireTime;

                // 无参构造函数
                public DelayMessage() {
                }

                // 带参数构造函数
                public DelayMessage(String id, String content, long expireTime) {
                    this.id = id;
                    this.content = content;
                    this.expireTime = expireTime;
                }

                // Getter 和 Setter 方法
                public String getId() {
                    return id;
                }

                public void setId(String id) {
                    this.id = id;
                }

                public String getContent() {
                    return content;
                }

                public void setContent(String content) {
                    this.content = content;
                }

                public long getExpireTime() {
                    return expireTime;
                }

                public void setExpireTime(long expireTime) {
                    this.expireTime = expireTime;
                }
            }

            public static class DelayQueue {
                private static final String KEY = "delay_queue:" + getHostAddress();
                private final Map<String, DelayMessage> queue = new HashMap<>();

                public void put(DelayMessage message) {
                    queue.put(message.getId(), message);
                }

                public DelayMessage remove(DelayMessage message) {
                    return queue.remove(message.getId());
                }

                public List<DelayMessage> getExpiredMessages() {
                    long currentTime = System.currentTimeMillis();
                    List<DelayMessage> expiredMessages = new ArrayList<>();
                    for (DelayMessage message : queue.values()) {
                        if (message.getExpireTime() <= currentTime) {
                            expiredMessages.add(message);
                        }
                    }
                    return expiredMessages;
                }

                public static String getHostAddress() {
                    InetAddress localHost;
                    try {
                        localHost = InetAddress.getLocalHost();
                    } catch (UnknownHostException e) {
                        e.printStackTrace();
                        return "127.0.0.1";
                    }
                    return localHost.getHostAddress();
                }
            }

            public static class DelayMessageHandler {
                private static final SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                private final DelayQueue delayQueue;

                public DelayMessageHandler(DelayQueue delayQueue) {
                    this.delayQueue = delayQueue;
                }

                public void handleExpiredMessages() {
                    String currTime = getCurrTime();
                    List<DelayMessage> messages = delayQueue.getExpiredMessages();
                    System.out.println(currTime + " 待处理消息数量: " + messages.size());
                    if (!messages.isEmpty()) {
                        for (DelayMessage message : messages) {
                            System.out.println(message.getId() + "------------> 消息开始处理");
                            try {
                                // 模拟处理时间
                                Thread.sleep(3000);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                System.err.println("处理消息时被中断");
                                e.printStackTrace();
                            }
                            System.out.println(message.getId() + "------------> 消息处理结束");
                            delayQueue.remove(message);
                        }
                    }
                }

                public static String getCurrTime() {
                    return dateTimeFormat.format(new Date());
                }
            }
        }
    b.说明
        DelayMessage 类: 代表延迟消息,包含消息 ID、内容和过期时间
        DelayQueue 类: 模拟 Redis 延迟队列。提供添加消息、删除消息和获取已过期消息的方法
        DelayMessageHandler 类: 处理已到期的消息。使用定时器定期检查并处理过期的消息
        main 方法: 启动应用程序,添加一些延迟消息,并开始处理已到期的消息

8.9 [3]redisson延迟队列:RDelayedQueue接口

01.Redisson实现延迟队列(RDelayedQueue接口)
    a.概念
        Redisson 是一个基于 Redis 的 Java 客户端,Redisson 提供了 RDelayedQueue 接口和 RQueue, RBlockingDeque 接口来实现延迟队列
    b.原理如下
        首先,你需要创建一个基本的队列,然后将它包装在一个延迟队列中。将延迟任务添加到延迟队列中,指定任务的内容和延迟时间(以毫秒为单位)
        延迟队列会自动处理过期的任务并将它们移动到基本队列中。你可以从基本队列中获取任务并进行处理
        过程有点像RabbitMq, 延迟队列 + TTL + 死信队列,用两个队列结合使用
    c.总结
        在 Redisson 框架中,提供了一个 RDelayedQueue 用于实现延迟队列,使用简单方便,推荐使用

02.示例1
    a.代码
        package com.ruoyi.redis4.demo06;

        import java.util.concurrent.TimeUnit;
        import org.redisson.Redisson;
        import org.redisson.api.RBlockingQueue;
        import org.redisson.api.RDelayedQueue;
        import org.redisson.api.RedissonClient;
        import org.redisson.config.Config;

        // Redisson 实现延迟队列
        class TaskProducer implements Runnable {

            private final RDelayedQueue<String> delayedQueue;

            public TaskProducer(RDelayedQueue<String> delayedQueue) {
                this.delayedQueue = delayedQueue;
            }

            @Override
            public void run() {
                try {
                    for (int i = 0; i < 5; i++) {
                        String task = "task-" + i;
                        delayedQueue.offer(task, 5 * i, TimeUnit.SECONDS); // 任务将在 i*5 秒后执行
                        System.out.println("任务" + task + "已放入队列,将在" + (i * 5) + "秒后执行");
                    }
                } catch (Exception e) {
                    System.err.println("任务生产者异常: " + e.getMessage());
                }
            }
        }

        class TaskConsumer implements Runnable {

            private final RBlockingQueue<String> blockingQueue;

            public TaskConsumer(RBlockingQueue<String> blockingQueue) {
                this.blockingQueue = blockingQueue;
            }

            @Override
            public void run() {
                try {
                    while (true) {
                        String task = blockingQueue.take(); // 阻塞等待并获取任务
                        System.out.println("消费任务: " + task);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("任务消费者被中断: " + e.getMessage());
                }
            }
        }

        public class Demo06 {

            public static void main(String[] args) {
                // 配置 Redisson 客户端
                Config config = new Config();
                config.useSingleServer().setAddress("redis://127.0.0.1:6379");
                RedissonClient redisson = Redisson.create(config);

                // 创建阻塞队列和延迟队列
                RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("delayQueue");
                RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);

                // 启动任务生产者和消费者线程
                Thread producerThread = new Thread(new TaskProducer(delayedQueue));
                Thread consumerThread = new Thread(new TaskConsumer(blockingQueue));

                producerThread.start();
                consumerThread.start();

                // 等待一段时间以观察输出
                try {
                    Thread.sleep(30000); // 主线程休眠 30 秒
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 关闭 Redisson 客户端
                    redisson.shutdown();
                    System.out.println("Redisson 客户端已关闭");
                }
            }
        }
    b.说明
        Redisson 客户端配置: 使用 Config 类配置 Redis 连接
        生产者使用 RDelayedQueue 将任务添加到延迟队列,每个任务有一个延迟时间
        消费者使用 RBlockingQueue 从队列中阻塞式地获取任务进行处理
        创建 Redisson 客户端并启动生产者和消费者线程

03.示例2
    a.定义延时消费队列
        import jakarta.annotation.PostConstruct;
        import lombok.RequiredArgsConstructor;
        import org.redisson.api.RBlockingDeque;
        import org.redisson.api.RDelayedQueue;
        import org.redisson.api.RedissonClient;
        import org.springframework.stereotype.Component;

        import java.util.concurrent.TimeUnit;

        @Component
        @RequiredArgsConstructor
        public class AutoCancelOrderQueue {

            /**
             * 消息 Key
             */
            private static final String KEY = "auto-cancel-order";

            private final RedissonClient redissonClient;

            @PostConstruct
            public void init() {
                delayedQueue();
            }

            private RDelayedQueue<IdMessage> delayedQueue() {
                return redissonClient.getDelayedQueue(destinationQueue());
            }

            private RBlockingDeque<IdMessage> destinationQueue() {
                return redissonClient.getBlockingDeque(KEY);
            }

            public void add(IdMessage message) {
                add(message, 30, TimeUnit.MINUTES);
            }

            public void add(IdMessage message, int delay, TimeUnit timeUnit) {
                delayedQueue().offer(message, delay, timeUnit);
            }

            public IdMessage poll() {
                return destinationQueue().poll();
            }
        }
        -----------------------------------------------------------------------------------------------------
        主要代码是redissonClient.getDelayedQueue(redissonClient.getBlockingDeque(KEY)),参数中的 BlockingDeque 就是目标队列
        需要注意的是 init() 方法的 @PostConstruct 注解。这个是为了防止项目启动后,如果一直没有新增过这个延时消息(即,没有调用过 redissonClient.getDelayedQueue() 方法),会导致当前延时队列的 RedissonDelayedQueue 轮询功能没有被启动,所以需要在服务启动后初始化一下这个延时队列
        这里消费时使用的是目标队列的 poll() 方法,这种方式是非阻塞的(空队列时不会阻塞,而是直接返回 null),也可以选择使用阻塞的 take() 方法
        Redis 的 List 数据类型还支持 peek() 操作,在单个消费者时,可以使用该方式获取消息,在消费结束后再移除消息。这种方式消息未消费的几率会低一些,但毕竟是基于缓存的方案,仍然不能保证 100%。如果可能有多个消费者,则不太推荐使用这种方式
    b.消费消息
        import lombok.RequiredArgsConstructor;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.scheduling.annotation.Scheduled;
        import org.springframework.stereotype.Component;

        import java.util.concurrent.TimeUnit;

        @Component
        @Slf4j
        @RequiredArgsConstructor
        public class AutoCancelOrderConsumer {

            public static final int MAX_RETRY_TIMES = 5;

            private final AutoCancelOrderQueue autoCancelOrderQueue;
            private final OrderBizService orderBizService;

            @Scheduled(fixedRate = 5, timeUnit = TimeUnit.SECONDS)
            public void consumeMessages() {
                while (true) {
                    var message = autoCancelOrderQueue.poll();
                    if (message == null) return;
                    processMessage(message);
                }
            }

            private void processMessage(IdMessage message) {
                log.info("AutoCancelOrderConsumer message: {}", message);
                try {
                    orderBizService.cancelOrder(message.getId());
                } catch (Exception e) {
                    if (message.getTimes() <= MAX_RETRY_TIMES) {
                        message.setTimes(message.getTimes() + 1);
                        autoCancelOrderQueue.add(message, message.getTimes(), TimeUnit.MINUTES);
                    }
                }
            }
        }
        -----------------------------------------------------------------------------------------------------
        这里使用 @Scheduled 配合 while (true) 的循环方式来消费消息,队列为空时退出循环并 5 秒后重试
        这里在 catch 中加了发生异常时的重试处理(使用新的延时时间重新加入到延迟队列),并且为了防止死循环的发生,设置了最大重试次数(使用 times 字段标记当前重试次数)