关于消息队列Kafka知识的学习
Kafka是一项非常重要的消息队列技术,在大数据场景中被主要采用。
第1章 Kafka概述
1.1 Kafka的定义
Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义:Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
1.2 传统消息队列的主要应用场景
主要应用场景包括:缓冲/消峰,解耦,异步通信
1.3 Kafka的特点
- 高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分为多个partition,由多个consumer group对partition进行consume操作
- 可扩展性:Kafka集群支持热扩展。
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中有节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
综合传统消息队列的主要应用场景和Kafka的特点,Kafka起到的作用可以归纳为:消峰填谷,解耦!在大数据流式计算领域中,Kafka主要作为计算机系统的前置缓存和输出结果缓存。其扮演的角色通常为:存储系统(持久性);消息系统;流处理平台。
1.4 Kafka的基础架构
- Producer:消息生产者,就是向Kafka broker发消息的客户端
- Consumer:消息消费者,向Kafka broker取消息的客户端
- Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是一个逻辑上的一个订阅者
- Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
- Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic
- Partition:为了实现扩展性,一个非常大的tipic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列
- Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower
- Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费数据的对象都是Leader
- Follower:每个分区多个副本的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader
第2章 Kafka命令行操作
2.1 主题命令行操作
查看当前服务器中的所有topic bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –list
创建主题
bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –create –partitions 分区数 –replication-factor 副本数 –topic 主题名称
查看指定主题详情
bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –describe –topic 主题名称
修改主题分区数
bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –alter –topic 主题名称 –partitions 修改后的分区数
删除主题
bin/kafka-topics.sh –bootstrap-server 节点名称:9092 –delete –topic 主题名称
2.2 生产者命令行操作
发送消息
bin/kafka-console-producer.sh –bootstrap-server 节点名称:9092 –topic 主题名称
2.3 消费者命令行操作
消费指定主题中的消息
bin/kafka-console-consumer.sh –bootstrap-server 节点名称:9092 –topic 主题名称
把该主题中的所有数据读取出来(包括历史数据)
bin/kafka-console-consumer.sh –bootstrap-server 节点名称:9092 –from-beginning –topic 主题名称
第3章 Kafka生产者
3.1 生产者消息发送流程
在消息发送的过程中,涉及到了两个线程–main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送到RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker
ISR队列:Leader以及与Leader保持同步的Follower的正常存活副本队列
注意:即使acks=-1,也不能完全保证数据发送的100%完整性?因为,如果服务端目标partition的同步副本只有Leader自己了,此时,它收到数据就会给生产者反馈成功。但是一旦反馈之后就宕机,数据还未被持久化,则完整性不能得到保证
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试
3.2 异步发送API
创建MAVEN项目
导入Kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
创建并配置 Kafka 生产者的配置对象
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
3.2.1 不带回调函数的API代码
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
}
3.2.2 带回调函数的API代码
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 添加回调
kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i),
new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主 题 : " +
metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发
送成功,如果 Exception 不为 null,说明消息发送失败
3.3 同步发送API
只需在异步发送的基础上,调用一下get()方法即可
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 10; i++) {
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
}
注意:不论是异步发送还是同步发送,都还有非常重要的一步:关闭资源
// 5. 关闭资源
kafkaProducer.close();
3.4 生产者分区
3.4.1 分区好处
- 便于合理使用存储资源,每个partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位消费数据
3.4.2 生产者发送消息的分区策略
指明partition的情况下,直接将指明的值作为partition的值;例如:partition=0,所有数据写入分区0
// 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数) kafkaProducer.send(new ProducerRecord<>("first",1,"","atguigu " + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata,Exception e) { if (e == null){ System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区:" + metadata.partition()); }else { e.printStackTrace(); } } }); }
没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition的值;例如:key1的hash值=5,key2的hash值=6,topic的分区数=2,那么key1对应的value1写入1号分区,key2对应的value2写入0号分区
// 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0 kafkaProducer.send(new ProducerRecord<>("first","a","atguigu " + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata,Exception e) { if (e == null){ System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区:" + metadata.partition()); }else { e.printStackTrace(); } } }); }
既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机选一个分区进行使用(和上一次的分区不同);例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到,Kafka再随机选择一个分区进行使用(如果还是0会继续随机)
代码示例如–3.2.2 带回调函数的API代码
自定义分区器
- 定义类实现Partitioner接口
- 重写partition()方法
- 在生产者的配置对象中添加自定义分区器
例:实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1号分区。
//setup1、setup2
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
/**
* 返回信息对应的分区
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue = value.toString();
// 创建 partition
int partition;
// 判断消息是否包含 atguigu
if (msgValue.contains("atguigu")){
partition = 0;
}else {
partition = 1;
}
// 返回分区号
return partition;
}
// 关闭资源
@Override
public void close() {}
// 配置方法
@Override
public void configure(Map<String, ?> configs) {}
}
------------------------------------------------------------------
//setup3
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
3.5 生产经验–生产者如何提高吞吐量
- 修改batch.size大小,修改为32k
- 修改linger.ms等待时间,修改为5~100ms
- 修改compress.type压缩模式,修改为“snappy”
- 修改RecordAccumulator缓冲区大小,修改为64M
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
3.6 生产经验–数据可靠性
ack应答原理:
ack=0:生产者发送过来的数据,不需要等数据落盘应答
ack=1:生产者发送过来的数据,Leader收到数据后应答
ack=-1:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
思考:Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?
Leader维护了一个动态的in-sync replica set(ISR) ),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
代码配置
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
3.7 生产经验–数据去重
3.7.1 数据传递语义
- 至少传递一次 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
可以保证数据不丢失,但是不能保证数据不重复(在Leader准备应答时,Leader发生故障,但是Follower已经同步数据) - 最多一次 = ACK级别设置为0
可以保证数据不重复,但是不能保证数据不丢失 - 精确一次:对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务
3.7.2 幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复
精确一次( (Exactly Once) ) = 幂等性 + 至少一次( ( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2) )
重复数据的判断标准:具有**<PID, Partition, SeqNumber>**相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的会话ID(所以幂等性只能保证的是在单分区单会话内不重复);Partition 表示分区号;Sequence Number是单调自增的。
那么如何使用幂等性呢?开启参数 enable.idempotence 默认为 true,false关闭
3.7.3 生产者事务
Kafka事务原理
代码配置
// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");
---
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try {
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("first",
"atguigu " + i));
}
// int i = 1 / 0;
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.close();
}
3.8 生产经验–数据有序
单分区内,有序
多分区,分区与分区之间无序
3.9 生产经验–数据乱序
Kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)Kafka在1.x及以后版本保证数据单分区有序,条件如下:
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置未1
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5
原因说明:因为在Kafka1.x以后,启用幂等后,Kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的如果开启了幂等性且缓存的请求个数小于5个,会在服务端重新排序
第4章 Kafka Broker
4.1 Kafka Broker工作流程
4.1.1 Zookeeper存储的Kafka信息
注:在0.9版本之前Kafka的offset信息存储在Zookeeper中,在0.9版本之后,存储在Kafka的topic_offsets主题中
4.1.2 Kafka Broker总体工作流程
4.2 生产经验–节点服役和退役
4.3 Kafka副本
4.3.1 副本基本信息
- Kafka副本作用:提高数据可靠性
- Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率
- Kafka中副本分为:Leader 和 Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据
- Kafka分区中的所有副本统称为AR
AR = ISR + OSR
OSR,表示Follower与Leader副本同步时,延迟过多的副本
4.3.2 Leader选举流程
Leader选举流程为4.1.2Broker总体工作流程图中的(1)、(2)、(3)、(4)步
Kafka集群中有一个broker的Controller会被选举为Controller Leader,负责管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作
4.3.3 Leader 和 Follower故障处理细节
4.4 文件存储
4.4.1 文件存储机制
1)topic数据的存储机制
2)index 和 log 文件详解
4.4.2 文件清理策略
Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间:
- log.retention.hours,最低优先级–小时,默认7天
- log.retention.minutes,分钟
- log.retention.ms,最高优先级–毫秒
- log.retention.check.interval.ms,负责设置检查周期,默认5分钟
那么日志一旦超过了设置的时间,怎么处理呢?Kafka中提供的日志清理策略有delete 和 compact两种
1)delete日志删除:将过期数据删除
- log.cleanup.policy = delete 所有数据启用删除策略
(1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment
log.retention.bytes,默认等于-1,表示无穷大
2)compact日志压缩:对于相同key的不同value值,只保留最后一个版本
- log.cleanup.policy = compact 所有数据启用压缩策略
压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料
4.5 高效读写数据
Kafka本身的分布式集群,可以采用分区技术,并行度高
读数据采用稀疏索引,可以快速定位要消费的数据
顺序写磁盘
Kafka的producer生产数据,是一直追加到log文件末端,为顺序写。顺序写能到600M/s,随机写只有100K/s,因为顺序写省去大量磁头寻址的时间
页缓存+零拷贝技术
第5章 Kafka消费者
5.1 Kafka消费方式
consumer采用pull(拉)模式从broker中主动拉取数据
pull模式不足之处是,如果Kafka一直没有数据,消费者可能会陷入循环之中,一直返回空数据!
5.2 Kafka消费者工作流程
5.2.1 消费者总体工作流程
可以看出都是从副本leader拉数据
5.2.2 消费者组原理
5.3 消费者API
注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id,会被自动填写随机的消费者组 id。
创建并配置 Kafka 生产者的配置对象
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
需求:创建一个独立消费者,消费 first主题中数据
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {
System.out.println(consumerRecord);
}
}
需求:创建一个独立消费者,消费 first主题 0 号分区的数据
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);
// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {
System.out.println(consumerRecord);
}
}
需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费
复制一份代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 注册主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {
System.out.println(consumerRecord);
}
}
5.4 生产经验–分区的分配以及再平衡
5.4.1 Range分区
再平衡:停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
1 号消费者:消费到 3、4号分区数据
2 号消费者:消费到 5、6号分区数据
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者
说明:0号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行
再次重新发送消息观看结果(45s 以后)
1 号消费者:消费到 0、1、2、3 号分区数据
2 号消费者:消费到 4、5、6号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配
5.4.2 RoundRobin分区
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
再平衡:停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
1 号消费者:消费到 1、4号分区数据
2 号消费者:消费到 2、5号分区数据
0 号消费者的任务会按照RoundRobin的方式,把数据轮询分成0、3和6号分区数据,分别由1号消费者或2号消费者消费。
再次重新发送消息观看结果(45s 以后)
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配
5.4.3 Sticky分区
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
再平衡:停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
1 号消费者:消费到 2、5、3号分区数据
2 号消费者:消费到 4、6号分区数据
0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1号消费者或者 2号消费者消费。
再次重新发送消息观看结果(45s 以后)
1 号消费者:消费到 2、3、5号分区数据
2 号消费者:消费到 0、1、4、6 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配
5.5 offset位移
5.5.1 offset的默认维护位置
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
5.5.2 自动提交offset
自动提交offset代码设置
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
5.5.3 手动提交offset
首先设置手动提交offset
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
同步提交
// 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord :consumerRecords) { System.out.println(consumerRecord.value()); } // 同步提交 offset consumer.commitSync(); }
异步提交
// 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord :consumerRecords) { System.out.println(consumerRecord.value()); } // 异步提交 offset consumer.commitAsync(); }
5.5.4 指定offset消费
auto.offset.reset = earliest | latest | none 默认是 latest
- earliest:自动将偏移量重置为最早的偏移量,–from-beginning
- latest(默认值):自动将偏移量重置为最新偏移量
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
任意指定offset位移开始消费代码
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 1700 的位置开始消费
for (TopicPartition tp: assignment) {
kafkaConsumer.seek(tp, 1700);
}
// 3 消费该主题数据
5.5.5 指定时间消费
废话不多说,直接上代码
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets =kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}
}
// 3 消费该主题数据
5.5.6 重复消费和漏消费
重复消费:已经消费了数据,但是 offset没提交
漏消费:先提交 offset后消费,有可能会造成数据的漏消费
5.6 生产经验–消费者事务
5.7 生产经验–数据积压(消费者如何提高吞吐量)
扩展章节–Kafka-Kraft模式
Kafka-Kraft架构
左图为 Kafka 现有架构,元数据在 zookeeper(小人) 中,运行时动态选举 controller,由controller进行 Kafka 集群管理。右图为 kraft模式架构(实验性),不再依赖 zookeeper集群,而是用三台 controller节点代替 zookeeper,元数据保存在 controller中,由 controller 直接进行 Kafka集群管理。
这样做的好处有以下几个:
- Kafka 不再依赖外部框架,而是能够独立运行
- controller管理集群时,不再需要从 zookeeper中先读取数据,集群性能上升
- 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper读写能力限制
- controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策
- 标题: 关于消息队列Kafka知识的学习
- 作者: 宣胤
- 创建于: 2023-04-20 11:17:51
- 更新于: 2023-05-22 14:12:53
- 链接: http://xuanyin02.github.io/2023/042039691.html
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。