type
status
date
slug
summary
tags
category
icon
password

Kafka为什么快

  • 使用零拷贝技术
  • 顺序写消息
  • 批处理,合并小的请求

什么是零拷贝

1. 传统数据传输流程

在传统的数据传输中,数据从磁盘到网络通常需要以下步骤:
  1. 磁盘读取:数据从磁盘读取到内核缓冲区。
  1. 内核到用户空间复制:数据从内核缓冲区复制到用户空间缓冲区。
  1. 用户空间到内核复制:数据从用户空间缓冲区复制到内核的网络缓冲区。
  1. 网络发送:数据从内核的网络缓冲区发送到网络。
这种多次复制会导致较高的 CPU 和内存开销。

2. Kafka 的零拷贝流程

Kafka 通过零拷贝技术避免了用户空间和内核空间之间的数据复制,具体流程如下:
  1. 磁盘读取:数据从磁盘读取到内核缓冲区。
  1. 直接发送到网络:数据直接从内核缓冲区发送到网络,无需复制到用户空间。
Kafka 使用操作系统的 sendfile 系统调用来实现零拷贝。sendfile 允许数据在内核缓冲区之间直接传输,避免了用户空间的参与。
  • 生产者到 Broker:生产者发送的消息通过零拷贝直接写入 Broker 的日志文件。
  • Broker 到消费者:Broker 从日志文件读取消息并通过零拷贝直接发送给消费者。

sendfile

sendfile接收源端和目的端的文件描述符作为参数,通过一次sendfile系统调用就可以直接在内核态中将内核缓冲区里的数据拷贝到 socket 缓冲区。两次用户态/内核态切换 + 三次数据拷贝
notion image

支持SG-DMA的sendfile(零拷贝)

如果网卡支持 SG-DMA 技术,那么在sendfile() 系统调用的过程中网卡的 SG-DMA 控制器可以直接将数据从PageCache拷贝到网卡,不需要CPU在内存中进行数据拷贝,因此称为零拷贝两次用户态/内核态切换 + 两次数据拷贝
notion image

什么是顺序写

  • 直接写入 Page Cache:当 Broker 接收到生产者的消息后,会直接将消息写入操作系统的 Page Cache(页缓存),而不是先写入用户空间缓冲区。
  • 异步刷盘:Broker 依赖操作系统的页缓存机制,定期将数据刷入磁盘(Flush),而不是每次写入都直接操作磁盘。
notion image

Kafka中的AR、ISR、OSR代表什么

AR:ISR + OSR
ISR(In-Sync-Replicas):副本同步队列。ISR中包括Leader和Foller。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leade
OSR(Out-of-Sync Replicas):非同步副本队列。与leader副本同步滞后过多的副本(不包括leader副本)。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。
当unclean.leader.election.enable=true时,OSR中的副本才可以选取为leader
isr-expiration:isr-expiration任务会周期性的检测每个分区是否需要缩减其ISR集合,如果副本落后会被移出ISR,如果follower数据同步赶上leader,那么该follower就能进入ISR,扩充。上面关于ISR尖子班人员的所见,都会记录到isrChangeSet中,想象成是一个名单列表,谁能进,谁要出,都记录在案。
isr-change-propagation:作用就是检查isrChangeSet,按照名单上的信息移除和迁入,一般是2500ms检查一次,但是为了防止频繁收缩扩充影响性能,不是每次都能做变动,必须满足:
1、上一次ISR集合发生变化距离现在已经超过5秒,
2、上一次写入zookeeper的时候距离现在已经超过60秒。这两个条件都满足,那么就开始换座位!这两个条件可以由我们来配置

kafka follower如何与leader同步数据

kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差
Leader维护了一个动态的(ISR-同步副本列表),意为和leader保持同步的follower集合。根据follower发来的FETCH请求中的fetch offset判断ISR中的follower完成数据同步是否成功。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

HW、LEO代表什么

LEO(log end offset):表示下一条待写入消息的offset,每个ISR都会维护自身的LEO,在ISR中最小的LEO就是HW
HW:高水位值,消费者而言只能消费 HW 之前的消息,就是Leader和follow同步副本的位置
notion image

Zookeeper 在 Kafka 中的作用(早期)

zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,
但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
1. Broker注册:Broker是分布式部署并且互相独立,此时需要有一个注册系统能够将整个集群中的Broker管理起来,此时就用到的Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokes/ids
2.Topic注册:在kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息以及与Broker的对应关系也都是由Zookeeper维护,由专门的节点记录:/brokers/topics
3.消费者注册:消费者服务器在初始化启动时加入消费者分组的步骤如下:注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumer/[groupid]/ids/[consumerid],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
  • 对消费者分组中的消费者的变化注册监听:每个消费者都需要关注所属消费者分组中的其他消费者服务器的变化情况,即对/consumer/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。
  • 对Broker服务器变化注册监听:消费者需要对/broker/ids[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。
  • 进行消费者负载均衡:为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消息分区分配的过程,通常对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会进行消费者负载均衡。
  • Offset记录 在消费者对指定消息分区进行消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便对该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:/consumers/[groupid]/offsets/[topic]/[brokerid-partition_id] 节点内容就是Offset的值。

Kafka的数据模型与消息存储机制

在kafka的数据文件夹下,分为两种类型的文件:索引文件(index File)中的数据文件(Data File)。索引文件存的是消息的索引信息,帮助快速定位到某条消息。数据文件存储的是具体的消息内容
 
notion image
索引文件存储的是简单地索引数据,其:"(N、位置)"。其中N表示索引文件里的第几条消息,而位置则表示该条消息在数据文件中的物理偏移地址。例如下图中的"3,497"表示:索引文件里的第3条消息的368772,368772=368769+3),其在数据文件中的物理偏移地址为497
 
notion image

Kafka可靠性保证(不丢消息)

Kafka可靠性主要从三个方面来看,Broker、Producer、Consumer。

Broker

写数据时首先写到PageCache中,pageCache的数据通过linux的flusher程序异步批量存储至磁盘中,此过程称为刷盘。而pageCache位于内存。这部分数据会在断电后丢失。刷盘触发条件有三:
  • 主动调用sync或fsync函数
  • 可用内存低于阀值
  • dirty data时间达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘以后,dirty标志清除。
kafka没有提供同步刷盘的方式,也就是说理论上要完全让kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况,比如:
减少刷盘间隔log.flush.interval.ms(在刷新到磁盘之前,任何topic中的消息保留在内存中的最长时间) 减少刷盘数据量大小log.flush.interval.messages(在将消息刷新到磁盘之前,在日志分区上累积的消息数量)。
时间越短,数据量越小,性能越差,但是丢失的数据会变少,可靠性越好。这是一个选择题。
同时,Kafka通过producer和broker协同处理消息丢失的情况,一旦producer发现broker消息丢失,即可自动进行retry。retry次数可根据参数retries进行配置,超过指定次数会,此条消息才会被判断丢失。producer和broker之间,通过ack机制来判断消息是否丢失。
  • acks=0,producer不等待broker的响应,效率最高,但是消息很可能会丢。
  • acks=1,leader broker收到消息后,不等待其他follower的响应,即返回ack。也可以理解为ack数为1。此时,如果follower还没有收到leader同步的消息leader就挂了,那么消息会丢失。按照上图中的例子,如果leader收到消息,成功写入PageCache后,会返回ack,此时producer认为消息发送成功。但此时,按照上图,数据还没有被同步到follower。如果此时leader断电,数据会丢失。
  • acks=-1,leader broker收到消息后,挂起,等待所有ISR列表中的follower返回结果后,再返回ack。-1等效与all。这种配置下,只有leader写入数据到pagecache是不会返回ack的,还需要所有的ISR返回“成功”才会触发ack。如果此时断电,producer可以知道消息没有被发送成功,将会重新发送。如果在follower收到数据以后,成功返回ack,leader断电,数据将存在于原来的follower中。在重新选举以后,新的leader会持有该部分数据。数据从leader同步到follower,
⚠️
数据从pageCache被刷盘到disk。因为只有disk中的数据才能被同步到replica
需要2步:
  • 数据同步到replica,并且replica成功将数据写入PageCache。在producer得到ack后,哪怕是所有机器都停电,数据也至少会存在于leader的磁盘内。
  • 上面第三点提到了ISR的列表的follower,需要配合另一个参数才能更好的保证ack的有效性。ISR是Broker维护的一个“可靠的follower列表”,in-sync Replica列表,broker的配置包含一个参数:min.insync.replicas。该参数表示ISR中最少的副本数。如果不设置该值,ISR中的follower列表可能为空。此时相当于acks=1。

2. Producer

producer在发送数据时可以将多个请求进行合并后异步发送,合并后的请求首先缓存在本地buffer中,正常情况下,producer客户端的异步调用可以通过callback回调函数来处理消息发送失败或者超时的情况,但是当出现以下情况,将会出现数据丢失
  1. producer异常中断,buffer中的数据将丢失。
  1. producer客户端内存不足,如果采取的策略是丢弃消息(另一种策略是block阻塞),消息也会丢失。
  1. 消息产生(异步)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。
针对以上情况,可以有以下解决思路。
  1. producer采用同步方式发送消息,或者生产数据时采用阻塞的线程池,并且线程数不宜过多。整体思路就是控制消息产生速度。
  1. 扩大buffer的容量配置,配置项为:buffer.memory。这种方法可以缓解数据丢失的情况,但不能杜绝。

Consumer

Consumer消费消息有以下几个步骤:
  • 接收消息
  • 处理消息
  • 反馈处理结果
消费方式主要分为两种
  • 自动提交offset,Automatic Offset Committing (enable.auto.commit=true)
  • 手动提交offset,Manual Offset Control(enable.auto.commit=false)
Consumer自动提交机制是根据一定的时间间隔,将收到的消息进行commit,具体配置为:auto.commit.interval.ms。commit和消费的过程是异步的,也就是说可能存在消费过程未成功,commit消息就已经提交,此时就会出现消息丢失。我们可将提交类型改为手动提交,在消费完成后再进行提交,这样可以保证消息“至少被消费一次”(at least once),但如果消费完成后在提交过程中出现故障,则会出现重复消费的情况,本章不讨论,下章讲解。

消费者发送分区策略

轮询
range,
striky粘性
notion image

Kafka的Rebalance机制

触发 Rebalance 的时机
  1. 组成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
  1. 订阅的 Topic 个数发生变化。
  1. 订阅 Topic 的分区数发生变化。
缺点:在Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。

消息积压怎么处理

  • 增大分区数
  • 增大消费者数量
  • 设计内存线程消费分区
  • 批量拉取数据

如何保证消息不被重复消费(消费者幂等性)

出现原因:
  • 原因1:Consumer在消费过程中,被强行kill掉消费者线程或异常中断(消费系统宕机、重启等),导致实际消费后的数据,offset没有提交。
  • 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
  • 原因3:消费超时导致消费者与集群断开连接,offset尚未提交,导致重平衡后重复消费。一般消费超时(session.time.out)有以下原因:并发过大,消费者突然宕机,处理超时等。
消费者:根据全局唯一id查询数据库是否存在数据
生产者:Kafka引入了Producer ID(即PID)和Sequence Number。
  • PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
  • Sequence Numbler。对于每个PID,该Producer发送数据的每个都对应一个从0开始单调递增的Sequence Number
  • Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃,这样就可以实现了消息重复提交了.但是只能保证单个Producer对于同一个的Exactly Once语义
notion image

脑裂问题

Kafka是通过使用epoch number(纪元编号,也称为隔离令牌)来完成的。epoch number只是单调递增的数字,第一次选出Controller时,epoch number值为1,如果再次选出新的Controller,则epoch number将为2,依次单调递增。
每个新选出的controller通过Zookeeper 的条件递增操作获得一个全新的、数值更大的epoch number 。其他Broker 在知道当前epoch number 后,如果收到由controller发出的包含较旧(较小)epoch number的消息,就会忽略它们,即Broker根据最大的epoch number来区分当前最新的controller

Kafka事务

幂等性可以保证单个Producer会话、单个TopicPartition、单个会话session的不重不漏,如果Producer重启,或者是写入跨Topic、跨Partition的消息,幂等性无法保证。此时需要用到Kafka事务。Kafka 的事务处理,主要是允许应用可以把消费和生产的 batch 处理(涉及多个 Partition)在一个原子单元内完成,操作要么全部完成、要么全部失败。为了实现这种机制,我们需要应用能提供一个唯一 id,即使故障恢复后也不会改变,这个 id 就是 TransactionnalId(也叫 txn.id),txn.id 可以跟内部的 PID 1:1 分配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 内部自动生成的(并且故障恢复后这个 PID 会变化),有了 txn.id 这个机制,就可以实现多 partition、跨会话的 EOS 语义。当用户使用 Kafka 的事务性时,Kafka 可以做到的保证:
  1. 跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
  1. 跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);
  1. 跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。
事务性示例
Kafka 事务性的使用方法也非常简单,用户只需要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,然后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort,示例如下所示:生产者:
Kafka中只有两种事务隔离级别:readcommitted、readuncommitted 设置为readcommitted时候是生产者事务已提交的数据才能读取到。在执行 commitTransaction() 或 abortTransaction() 方法前,设置为“readcommitted”的消费端应用是消费不到这些消息的,不过在 KafkaConsumer 内部会缓存这些消息,直到生产者执行 commitTransaction() 方法之后它才能将这些消息推送给消费端应用。同时KafkaConsumer会根据分区对数据进行整合,推送时按照分区顺序进行推送。而不是按照数据发送顺序。反之,如果生产者执行了 abortTransaction() 方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。设置为read_uncommitted时候可以读取到未提交的数据(报错终止前的数据)

Kafka如何保证消息的有序性

生产者端保证有序
kafka在发送消息过程中,正常情况下是有序的,如果消息出现重试,则会造成消息乱序。导致乱序的原因是:max.in.flight.requests.per.connection默认值为5。
解决方式是将max.in.flight.requests.per.connection设置为1,消息队列中只允许有一个请求,这样消息失败后,可以第一时间发送,不会产生乱序,但是会降低网络吞吐量。
通过为消息指定相同的 Key,确保相关消息被发送到同一分区。例如,订单 ID 作为 Key,保证同一订单的操作顺序。
notion image
消费者端保证有序:单线程消费分区