关于消息队列Kafka知识的学习

宣胤

Kafka是一项非常重要的消息队列技术,在大数据场景中被主要采用。

第1章 Kafka概述

1.1 Kafka的定义

Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

image-20230420120502679
image-20230420120502679

Kafka最新定义:Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

1.2 传统消息队列的主要应用场景

主要应用场景包括:缓冲/消峰,解耦,异步通信

image-20230420113210146
image-20230420113210146
image-20230420113250219
image-20230420113250219
image-20230420113310640
image-20230420113310640

1.3 Kafka的特点

  • 高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分为多个partition,由多个consumer group对partition进行consume操作
  • 可扩展性:Kafka集群支持热扩展。
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  • 容错性:允许集群中有节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

综合传统消息队列的主要应用场景和Kafka的特点,Kafka起到的作用可以归纳为:消峰填谷,解耦!在大数据流式计算领域中,Kafka主要作为计算机系统的前置缓存和输出结果缓存。其扮演的角色通常为:存储系统(持久性);消息系统;流处理平台。

1.4 Kafka的基础架构

image-20230420115303092
image-20230420115303092
  1. Producer:消息生产者,就是向Kafka broker发消息的客户端
  2. Consumer:消息消费者,向Kafka broker取消息的客户端
  3. Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是一个逻辑上的一个订阅者
  4. Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
  5. Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic
  6. Partition:为了实现扩展性,一个非常大的tipic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列
  7. Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower
  8. Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费数据的对象都是Leader
  9. Follower:每个分区多个副本的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader

第2章 Kafka命令行操作

2.1 主题命令行操作

  • 查看当前服务器中的所有topic bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –list

  • 创建主题

    bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –create –partitions 分区数 –replication-factor 副本数 –topic 主题名称

  • 查看指定主题详情

    bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –describe –topic 主题名称

  • 修改主题分区数

    bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –alter –topic 主题名称 –partitions 修改后的分区数

  • 删除主题

    bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –delete –topic 主题名称

2.2 生产者命令行操作

  • 发送消息

    bin/kafka-console-producer.sh –bootstrap-server 节点名称:9092 –topic 主题名称

2.3 消费者命令行操作

  • 消费指定主题中的消息

    bin/kafka-console-consumer.sh –bootstrap-server 节点名称:9092 –topic 主题名称

  • 把该主题中的所有数据读取出来(包括历史数据)

    bin/kafka-console-consumer.sh –bootstrap-server 节点名称:9092 –from-beginning –topic 主题名称

第3章 Kafka生产者

3.1 生产者消息发送流程

在消息发送的过程中,涉及到了两个线程–main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送到RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker

image-20230420164903715
image-20230420164903715

ISR队列:Leader以及与Leader保持同步的Follower的正常存活副本队列

注意:即使acks=-1,也不能完全保证数据发送的100%完整性?因为,如果服务端目标partition的同步副本只有Leader自己了,此时,它收到数据就会给生产者反馈成功。但是一旦反馈之后就宕机,数据还未被持久化,则完整性不能得到保证

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试

3.2 异步发送API

创建MAVEN项目

导入Kafka依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

创建并配置 Kafka 生产者的配置对象

// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String,  String>  kafkaProducer  =  new
KafkaProducer<String, String>(properties);
3.2.1 不带回调函数的API代码
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) &#123;
kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
&#125;
3.2.2 带回调函数的API代码
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) &#123;
    // 添加回调
    kafkaProducer.send(new  ProducerRecord<>("first","atguigu " + i), 
    new Callback() &#123;
        // 该方法在 Producer 收到 ack 时调用,为异步调用
        @Override
        public void onCompletion(RecordMetadata metadata,Exception exception) &#123;
            if (exception == null) &#123;
                // 没有异常,输出信息到控制台
                System.out.println(" 主 题 : "  +
                metadata.topic() + "->" + "分区:" + metadata.partition());
            &#125; else &#123;
                // 出现异常打印
                exception.printStackTrace();
            &#125;
        &#125;
    &#125;);
    // 延迟一会会看到数据发往不同分区
    Thread.sleep(2);
&#125;

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发
送成功,如果 Exception 不为 null,说明消息发送失败

3.3 同步发送API

只需在异步发送的基础上,调用一下get()方法即可

// 4. 调用 send 方法,发送消息
for (int i = 0; i < 10; i++) &#123;
    // 同步发送
    kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
&#125;

注意:不论是异步发送还是同步发送,都还有非常重要的一步:关闭资源

// 5. 关闭资源
kafkaProducer.close();

3.4 生产者分区

3.4.1 分区好处
  1. 便于合理使用存储资源,每个partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  2. 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位消费数据
3.4.2 生产者发送消息的分区策略
  1. 指明partition的情况下,直接将指明的值作为partition的值;例如:partition=0,所有数据写入分区0

    // 4. 调用 send 方法,发送消息
    for (int i = 0; i < 5; i++) &#123;
        // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
        kafkaProducer.send(new  ProducerRecord<>("first",1,"","atguigu " + i), 
        new Callback() &#123;
            @Override
            public void onCompletion(RecordMetadata metadata,Exception e) &#123;
                if (e == null)&#123;
                System.out.println(" 主 题 : "  +
                metadata.topic() + "->" + "分区:" + metadata.partition());
                &#125;else &#123;
                    e.printStackTrace();
                &#125;
            &#125;
        &#125;);
    &#125;
    
  2. 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition的值;例如:key1的hash值=5,key2的hash值=6,topic的分区数=2,那么key1对应的value1写入1号分区,key2对应的value2写入0号分区

    // 4. 调用 send 方法,发送消息
    for (int i = 0; i < 5; i++) &#123;
        // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
        kafkaProducer.send(new  ProducerRecord<>("first","a","atguigu " + i), 
        new Callback() &#123;
            @Override
            public void onCompletion(RecordMetadata metadata,Exception e) &#123;
                if (e == null)&#123;
                System.out.println(" 主 题 : "  +
                metadata.topic() + "->" + "分区:" + metadata.partition());
                &#125;else &#123;
                    e.printStackTrace();
                &#125;
            &#125;
        &#125;);
    &#125;
    
  3. 既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机选一个分区进行使用(和上一次的分区不同);例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到,Kafka再随机选择一个分区进行使用(如果还是0会继续随机)

    代码示例如–3.2.2 带回调函数的API代码

  4. 自定义分区器

    1. 定义类实现Partitioner接口
    2. 重写partition()方法
    3. 在生产者的配置对象中添加自定义分区器

例:实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1号分区。

//setup1、setup2
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner &#123;
    /**
    * 返回信息对应的分区
    * @param topic 主题
    * @param key 消息的 key
    * @param keyBytes 消息的 key 序列化后的字节数组
    * @param value 消息的 value
    * @param valueBytes 消息的 value 序列化后的字节数组
    * @param cluster 集群元数据可以查看分区信息
    * @return
    */		
    @Override
    public  int  partition(String  topic,  Object  key,  byte[] keyBytes, Object value, byte[] 			valueBytes, Cluster cluster) &#123;
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 atguigu
        if (msgValue.contains("atguigu"))&#123;
        partition = 0;
        &#125;else &#123;
        partition = 1;
        &#125;
        // 返回分区号
        return partition;
    &#125;
    // 关闭资源
    @Override
    public void close() &#123;&#125;
    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) &#123;&#125;
&#125;

------------------------------------------------------------------
//setup3
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");

3.5 生产经验–生产者如何提高吞吐量

  1. 修改batch.size大小,修改为32k
  2. 修改linger.ms等待时间,修改为5~100ms
  3. 修改compress.type压缩模式,修改为“snappy”
  4. 修改RecordAccumulator缓冲区大小,修改为64M
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

3.6 生产经验–数据可靠性

ack应答原理:

ack=0:生产者发送过来的数据,不需要等数据落盘应答
ack=1:生产者发送过来的数据,Leader收到数据后应答
ack=-1:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答

思考:Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set(ISR) ),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

代码配置

// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

3.7 生产经验–数据去重

3.7.1 数据传递语义
  • 至少传递一次 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
    可以保证数据不丢失,但是不能保证数据不重复(在Leader准备应答时,Leader发生故障,但是Follower已经同步数据)
  • 最多一次 = ACK级别设置为0
    可以保证数据不重复,但是不能保证数据不丢失
  • 精确一次:对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务

3.7.2 幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复

精确一次( (Exactly Once) ) = 幂等性 + 至少一次( ( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2) )

重复数据的判断标准:具有**<PID, Partition, SeqNumber>**相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的会话ID(所以幂等性只能保证的是在单分区单会话内不重复);Partition 表示分区号;Sequence Number是单调自增的。

那么如何使用幂等性呢?开启参数 enable.idempotence 默认为 true,false关闭

3.7.3 生产者事务

Kafka事务原理

image-20230428154752478
image-20230428154752478

代码配置

// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");
---
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String  consumerGroupId)  throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try &#123;
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) &#123;
// 发送消息
kafkaProducer.send(new  ProducerRecord<>("first",
"atguigu " + i));
&#125;
// int i = 1 / 0;
// 提交事务
kafkaProducer.commitTransaction();
&#125; catch (Exception e) &#123;
// 终止事务
kafkaProducer.abortTransaction();
&#125; finally &#123;
// 5. 关闭资源
kafkaProducer.close();
&#125;

3.8 生产经验–数据有序

单分区内,有序
多分区,分区与分区之间无序

3.9 生产经验–数据乱序

  1. Kafka在1.x版本之前保证数据单分区有序,条件如下:
    max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)

  2. Kafka在1.x及以后版本保证数据单分区有序,条件如下:
    (1)未开启幂等性
    max.in.flight.requests.per.connection需要设置未1
    (2)开启幂等性
    max.in.flight.requests.per.connection需要设置小于等于5
    原因说明:因为在Kafka1.x以后,启用幂等后,Kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的

    如果开启了幂等性且缓存的请求个数小于5个,会在服务端重新排序

第4章 Kafka Broker

4.1 Kafka Broker工作流程

4.1.1 Zookeeper存储的Kafka信息

image-20230503225140019
image-20230503225140019

注:在0.9版本之前Kafka的offset信息存储在Zookeeper中,在0.9版本之后,存储在Kafka的topic_offsets主题中

4.1.2 Kafka Broker总体工作流程

image-20230503225259694
image-20230503225259694

4.2 生产经验–节点服役和退役

4.3 Kafka副本

4.3.1 副本基本信息
  1. Kafka副本作用:提高数据可靠性
  2. Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率
  3. Kafka中副本分为:Leader 和 Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据
  4. Kafka分区中的所有副本统称为AR
    AR = ISR + OSR
    OSR,表示Follower与Leader副本同步时,延迟过多的副本
4.3.2 Leader选举流程

Leader选举流程为4.1.2Broker总体工作流程图中的(1)、(2)、(3)、(4)步

Kafka集群中有一个broker的Controller会被选举为Controller Leader,负责管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作

4.3.3 Leader 和 Follower故障处理细节

image-20230503231507905
image-20230503231507905

image-20230503231437092
image-20230503231437092

4.4 文件存储

4.4.1 文件存储机制

1)topic数据的存储机制

image-20230508203739197
image-20230508203739197

2)index 和 log 文件详解

image-20230508212208719
image-20230508212208719

4.4.2 文件清理策略

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间:

  • log.retention.hours,最低优先级–小时,默认7天
  • log.retention.minutes,分钟
  • log.retention.ms,最高优先级–毫秒
  • log.retention.check.interval.ms,负责设置检查周期,默认5分钟

那么日志一旦超过了设置的时间,怎么处理呢?Kafka中提供的日志清理策略有delete 和 compact两种

1)delete日志删除:将过期数据删除

  • log.cleanup.policy = delete 所有数据启用删除策略

(1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment

​ log.retention.bytes,默认等于-1,表示无穷大

2)compact日志压缩:对于相同key的不同value值,只保留最后一个版本

  • log.cleanup.policy = compact 所有数据启用压缩策略

image-20230508215050506
image-20230508215050506

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料

4.5 高效读写数据

  • Kafka本身的分布式集群,可以采用分区技术,并行度高

  • 读数据采用稀疏索引,可以快速定位要消费的数据

  • 顺序写磁盘

    Kafka的producer生产数据,是一直追加到log文件末端,为顺序写。顺序写能到600M/s,随机写只有100K/s,因为顺序写省去大量磁头寻址的时间

  • 页缓存+零拷贝技术

image-20230517235003073
image-20230517235003073

第5章 Kafka消费者

5.1 Kafka消费方式

consumer采用pull(拉)模式从broker中主动拉取数据

pull模式不足之处是,如果Kafka一直没有数据,消费者可能会陷入循环之中,一直返回空数据!

5.2 Kafka消费者工作流程

5.2.1 消费者总体工作流程

image-20230517235750314
image-20230517235750314

可以看出都是从副本leader拉数据

5.2.2 消费者组原理

image-20230518000047461
image-20230518000047461

image-20230518000113496
image-20230518000113496

image-20230518000204148
image-20230518000204148

image-20230518000231855
image-20230518000231855

5.3 消费者API

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id,会被自动填写随机的消费者组 id。

创建并配置 Kafka 生产者的配置对象

// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

需求:创建一个独立消费者,消费 first主题中数据

// 创建消费者对象
KafkaConsumer<String,  String>  kafkaConsumer  =  new KafkaConsumer<String, String>(properties);

// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) &#123;
    // 设置 1s 中消费一批数据
    ConsumerRecords<String,  String>  consumerRecords  = kafkaConsumer.poll(Duration.ofSeconds(1));
    // 打印消费到的数据
    for (ConsumerRecord<String, String> consumerRecord :consumerRecords) &#123;
        System.out.println(consumerRecord);
    &#125;
&#125;

需求:创建一个独立消费者,消费 first主题 0 号分区的数据

// 创建消费者对象
KafkaConsumer<String,  String>  kafkaConsumer  =  new KafkaConsumer<String, String>(properties);

// 消费某个主题的某个分区数据
ArrayList<TopicPartition>  topicPartitions  =  new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);
// 拉取数据打印
while (true) &#123;
    // 设置 1s 中消费一批数据
    ConsumerRecords<String,  String>  consumerRecords  = kafkaConsumer.poll(Duration.ofSeconds(1));
    // 打印消费到的数据
    for (ConsumerRecord<String, String> consumerRecord :consumerRecords) &#123;
        System.out.println(consumerRecord);
    &#125;
&#125;

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费

复制一份代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者

// 创建消费者对象
KafkaConsumer<String,  String>  kafkaConsumer  =  new KafkaConsumer<String, String>(properties);
// 注册主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) &#123;
    // 设置 1s 中消费一批数据
    ConsumerRecords<String,  String>  consumerRecords  = kafkaConsumer.poll(Duration.ofSeconds(1));
    // 打印消费到的数据
    for (ConsumerRecord<String, String> consumerRecord :consumerRecords) &#123;
        System.out.println(consumerRecord);
    &#125;
&#125;

5.4 生产经验–分区的分配以及再平衡

image-20230520231827510
image-20230520231827510

5.4.1 Range分区

image-20230520232726068
image-20230520232726068

再平衡:停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
1 号消费者:消费到 3、4号分区数据
2 号消费者:消费到 5、6号分区数据

0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者

说明:0号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行

再次重新发送消息观看结果(45s 以后)
1 号消费者:消费到 0、1、2、3 号分区数据
2 号消费者:消费到 4、5、6号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配

5.4.2 RoundRobin分区
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

image-20230520233436300
image-20230520233436300

再平衡:停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
1 号消费者:消费到 1、4号分区数据
2 号消费者:消费到 2、5号分区数据

0 号消费者的任务会按照RoundRobin的方式,把数据轮询分成0、3和6号分区数据,分别由1号消费者或2号消费者消费。

再次重新发送消息观看结果(45s 以后)
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配

5.4.3 Sticky分区
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

再平衡:停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)

1 号消费者:消费到 2、5、3号分区数据
2 号消费者:消费到 4、6号分区数据

0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1号消费者或者 2号消费者消费。

再次重新发送消息观看结果(45s 以后)
1 号消费者:消费到 2、3、5号分区数据
2 号消费者:消费到 0、1、4、6 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配

5.5 offset位移

5.5.1 offset的默认维护位置

image-20230520235452353
image-20230520235452353

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

5.5.2 自动提交offset

image-20230521000115808
image-20230521000115808

自动提交offset代码设置

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
5.5.3 手动提交offset

image-20230521000347921
image-20230521000347921

首先设置手动提交offset

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  1. 同步提交

    // 消费数据
    while (true)&#123;
        // 读取消息
        ConsumerRecords<String,  String>  consumerRecords  = consumer.poll(Duration.ofSeconds(1));
        // 输出消息
        for (ConsumerRecord<String, String> consumerRecord :consumerRecords) &#123;
            System.out.println(consumerRecord.value());
        &#125;
        // 同步提交 offset
        consumer.commitSync();
    &#125;
    
  2. 异步提交

    // 消费数据
    while (true)&#123;
        // 读取消息
        ConsumerRecords<String,  String>  consumerRecords  = consumer.poll(Duration.ofSeconds(1));
        // 输出消息
        for (ConsumerRecord<String, String> consumerRecord :consumerRecords) &#123;
            System.out.println(consumerRecord.value());
        &#125;
        // 异步提交 offset
        consumer.commitAsync();
    &#125;
    
5.5.4 指定offset消费

auto.offset.reset = earliest | latest | none 默认是 latest

  • earliest:自动将偏移量重置为最早的偏移量,–from-beginning
  • latest(默认值):自动将偏移量重置为最新偏移量
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

image-20230521001240507
image-20230521001240507

任意指定offset位移开始消费代码

// 1 创建一个消费者
KafkaConsumer<String,  String>  kafkaConsumer  =  new KafkaConsumer<>(properties);

// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

Set<TopicPartition> assignment= new HashSet<>();

while (assignment.size() == 0) &#123;
    kafkaConsumer.poll(Duration.ofSeconds(1));
    // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
    assignment = kafkaConsumer.assignment();
&#125;

// 遍历所有分区,并指定 offset 从 1700 的位置开始消费
for (TopicPartition tp: assignment) &#123;
    kafkaConsumer.seek(tp, 1700);
&#125;

// 3 消费该主题数据
5.5.5 指定时间消费

废话不多说,直接上代码

// 1 创建一个消费者
KafkaConsumer<String,  String>  kafkaConsumer  =  new KafkaConsumer<>(properties);

// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

Set<TopicPartition> assignment = new HashSet<>();

while (assignment.size() == 0) &#123;
    kafkaConsumer.poll(Duration.ofSeconds(1));
    // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
    assignment = kafkaConsumer.assignment();
&#125;

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) &#123;
    timestampToSearch.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
&#125;

// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition,  OffsetAndTimestamp>  offsets  =kafkaConsumer.offsetsForTimes(timestampToSearch);

// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) &#123;
    OffsetAndTimestamp  offsetAndTimestamp  = offsets.get(topicPartition);
    // 根据时间指定开始消费的位置
    if (offsetAndTimestamp != null)&#123;
        kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
    &#125;
&#125;

// 3 消费该主题数据

5.5.6 重复消费和漏消费

重复消费:已经消费了数据,但是 offset没提交

漏消费:先提交 offset后消费,有可能会造成数据的漏消费

image-20230521002002630
image-20230521002002630

5.6 生产经验–消费者事务

image-20230521002127636
image-20230521002127636

5.7 生产经验–数据积压(消费者如何提高吞吐量)

image-20230521002250871
image-20230521002250871

扩展章节–Kafka-Kraft模式

Kafka-Kraft架构

image-20230521002607905
image-20230521002607905

左图为 Kafka 现有架构,元数据在 zookeeper(小人) 中,运行时动态选举 controller,由controller进行 Kafka 集群管理。右图为 kraft模式架构(实验性),不再依赖 zookeeper集群,而是用三台 controller节点代替 zookeeper,元数据保存在 controller中,由 controller 直接进行 Kafka集群管理。

这样做的好处有以下几个:

  • Kafka 不再依赖外部框架,而是能够独立运行
  • controller管理集群时,不再需要从 zookeeper中先读取数据,集群性能上升
  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper读写能力限制
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策
  • 标题: 关于消息队列Kafka知识的学习
  • 作者: 宣胤
  • 创建于: 2023-04-20 11:17:51
  • 更新于: 2023-05-22 14:12:53
  • 链接: http://xuanyin02.github.io/2023/042039691.html
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
 评论
此页目录
关于消息队列Kafka知识的学习