kafka常见面试题以及答案整理
Kafka的用途有哪些?使用场景如何?
Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么
ISR:所有与leader副本保持一定程度同步的副本(包括leader副本在内),(In-Sync Replicas)
OSR:与leader副本同步滞后过多或断开连接的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),
时间阈值由replica.lag.time.max.ms配置,默认30S
AR:所有的副本列表统称AR(Assigned Replicas)
ISR伸缩:leader副本负责维护和跟踪 ISR 集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从 ISR 集合中剔除。
如果 OSR 集合中所有follower副本“追上”了leader副本,那么leader副本会把它从 OSR 集合转移至 ISR 集合。默认情况下,当leader副本发生故障时,只有在 ISR 集合中的follower副本才有资格被选举为新的leader,而在 OSR 集合中的副本则没有任何机会(不过这个可以通过配置来改变
Kafka中的HW、LEO、LSO、LW等分别代表什么?
HW:High Watermark 高水位线,所有副本中最小的offset,即ISR中副本最小的LEO
LEO:Log End Offset,每个副本当前日志文件中下一条待写入消息的offset,即最新的Offset+1,
LSO:Last Stable Offset,与kafka 事务有关。对于未完成的事务而言,LSO的值等于事务中的第一条消息所在的位置(firstUnstableOffset);对于已经完成的事务而言,它的值等同于HW相同
LW:Low Watermark,AR集合中最小的LogStartOffset值。
Log Start Offset:每个副本当前日志文件中写入消息的起始offset
消费者配置参数:isolation.level,这个参数用来配置消费者事务的隔离级别。可选值“read_uncommitted”和“read_committed”,表示消费者所消费到
的位置,如果设置为“read_committed”,那么消费这就会忽略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置,
默认配置为”read_uncommitted”,即可以消费到HW(High Watermak)的位置。
注:follower副本的事务隔离级别也为“read_uncommitted”,并且不可修改。
Kafka中是怎么体现消息顺序性的?
一定条件下,消息单分区内有序
- 在kafka 1.x版本之前需要配置max.in.flight.requests.per.connect=1
- 在kafka 1.x版本后,未开启幂等性的情况下必须配置max.in.flight.requests.per.connect=1,开启幂等性配置(默认开启)可配置max.in.flight.requests.per.connect=5,
最大为5,因为kafka服务器端会缓存producer5个request的元数据
Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
拦截器->序列化器->分区器
- 拦截器:用于Client的定制化逻辑处理,比如说过滤不合规则的数据,补充修改消息内容等等,自定义拦截器可以通过实现ProducerInterceptor(生产者的拦截器)接口
- 序列化器: 序列化数据,防止数据丢失
- 分区器:按照一定规则,将数据划分到不同的分区,若未手动指定分区,则使用默认的分区策略,也可通过实现Partitioner实现自定义分区
Kafka生产者客户端的整体结构是什么样子的?

Kafka生产者客户端中使用了几个线程来处理?分别是什么?
俩个,main线程和sender线程,具体作用详见上图
Kafka的旧版Scala的消费者客户端的设计有什么缺陷?
老版本的 Consumer Group 把位移保存在 ZooKeeper 中,这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能
“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?如果不正确,那么有没有什么hack的手段?
一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区,但是可以通过继承AbstractPartitionAssignor
实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区,其实就是组内广播,
消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。
当前消费者需要提交的消费位移是offset+1
有哪些情形会造成重复消费?
Rebalance:一个consumer正在消费一个分区的一条消息,还没有消费完,发生了rebalance(加入了一个consumer),从而导致这条消息没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍。
消费者端手动提交:如果先消费消息,再更新offset位置,导致消息重复消费。
消费者端自动提交:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
生产者端:生产者因为业务问题导致的宕机,在重启之后可能数据会重发
那些情景下会造成消息漏消费?
- 自动提交:设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
- 生产者发送消息:
发送消息设置的是fire-and-forget(发后即忘),它只管往 Kafka 中发送消息而并不关心消息是否正确到达。不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。- 消费者端:
先提交位移,但是消息还没消费完就宕机了,造成了消息没有被消费。自动位移提交同理- acks没有设置为all:
如果在broker还没把消息同步到其他broker的时候宕机了,那么消息将会丢失
KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?

简述消费者与消费组之间的关系
Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费
当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
Kafka 会在 log.dir 或 log.dirs 参数所配置的目录下创建相应的主题分区,默认情况下这个目录为/tmp/kafka-logs/。
在 ZooKeeper 的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案
topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
可以增加,使用 kafka-topics 脚本,结合 –alter 参数来增加某个主题的分区数
1 | kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数> |
当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。
其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
最后,Rebalance 实在是太慢了
topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不支持,因为删除的分区中的消息不好处理。如果直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于 Spark、Flink 这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的
创建topic时如何选择合适的分区数?
可以使用Kafka 本身提供的用于生产者性能测试的 kafka-producer- perf-test.sh 和用于消费者性能测试的 kafka-consumer-perf-test.sh来进行测试。
增加合适的分区数可以在一定程度上提升整体吞吐量,但超过对应的阈值之后吞吐量不升反降。如果应用对吞吐量有一定程度上的要求,则建议在投入生产环境之前对同款硬件资源做一个完备的吞吐量相关的测试,以找到合适的分区数阈值区间。
分区数的多少还会影响系统的可用性。如果分区数非常多,如果集群中的某个 broker 节点宕机,那么就会有大量的分区需要同时进行 leader 角色切换,这个切换的过程会耗费一笔可观的时间,并且在这个时间窗口内这些分区也会变得不可用。
分区数越多也会让 Kafka 的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。
Kafka目前有那些内部topic,它们都有什么特征?各自的作用又是什么?
__consumer_offsets:作用是保存 Kafka 消费者的位移信息
__transaction_state:用来存储事务日志消息
优先副本是什么?它有什么特殊的作用?
所谓的优先副本是指在AR集合列表中的第一个副本。理想情况下,优先副本就是该分区的leader 副本,所以也可以称之为 preferred leader。
Kafka 要确保所有主题的优先副本在 Kafka 集群中均匀分布,这样就保证了所有分区的 leader 均衡分布。以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”
Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理
- 生产者的分区分配:是指为每条消息指定其所要发往的分区。可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。
- 消费者中的分区分配:是指为消费者指定其可以消费消息的分区。Kafka 提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。
- 分区副本的分配:是指为集群制定创建主题时的分区副本分配方案,即在哪个 broker 中创建哪些分区的副本。kafka-topics.sh 脚本中提供了一个 replica-assignment 参数来手动指定分区副本的分配方案
简述Kafka的日志目录结构

Kafka中有那些索引文件?
- 偏移量索引文件:用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置
- 时间戳索引文件:则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
如果我指定了一个offset,Kafka怎么查找到对应的消息?
Kafka是通过seek() 方法来指定消费的,在执行seek() 方法之前要去执行一次poll()方法,等到分配到分区之后会去对应的分区的指定位置开始消费,如果指定的位置发生了越界,那么会根据auto.offset.reset 参数设置的情况进行消费。
如果我指定了一个timestamp,Kafka怎么查找到对应的消息?
Kafka提供了一个 offsetsForTimes() 方法,通过 timestamp 来查询与此对应的分区位置。offsetsForTimes() 方法的参数 timestampsToSearch 是一个 Map 类型,key 为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于 OffsetAndTimestamp 中的 offset 和 timestamp 字段
聊一聊你对Kafka的Log Retention的理解
日志删除: 配置服务端参数log.cleanup.policy:delete(默认就是delete)
- 基于时间: 日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments)retentionMs, 可以通过 broker 端参数 log.retention.hours、log.retention.minutes 和 log.retention.ms 来配置,三个配置优先级依次提升。默认情况下只配置了 log.retention.hours 参数,其值为168,即为7天。
删除日志分段时,首先会从 Log 对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上“.deleted”的后缀(当然也包括对应的索引文件)。最后交由一个以“delete-file”命名的延迟任务来删除这些以“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过 file.delete.delay.ms 参数来调配,此参数的默认值为60000,即1分钟。- 基于大小: 日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。
retentionSize 可以通过 broker 端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。注意 log.retention.bytes 配置的是 Log 中所有日志文件的总大小,而不是单个日志分段(确切地说应该为 .log 日志文件)的大小。单个日志分段的大小由 broker 端参数 log.segment.bytes 来限制,默认值为1073741824,即 1GB- 基于日志偏移量: 这个无法配置,一般不关注,一般情况下日志文件的起始偏移量logStartOffset(logStartOffset值是整个 Log 对象对外可见消息的最小位移值)等于第一个日志分段的baseOffset,但是这并不是绝对的,logStartOffset的值可以通过DeleteRecordsRequest请求、日志的清理和截断等操作修改。
1 | log.retention.hours=168 //7d |
聊一聊你对Kafka的Log Compaction的理解
日志压缩: 配置服务端参数log.cleanup.policy:compact
Log Compaction 对于有相同 key 的不同 value 值,只保留最后一个版本。如果应用只关心 key 对应的最新 value 值,则可以开启 Kafka 的日志清理功能,Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值,一般可用于用户信息存储等
聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)
页缓存: 页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问,基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少我们可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。
此外,即使 Kafka 服务重启,页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。零拷贝: 除了消息顺序追加、页缓存等技术,Kafka 还使用零拷贝(Zero-Copy)技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux 操作系统而言,零拷贝技术依赖于底层的 sendfile() 方法实现。对应于 Java 语言,FileChannal.transferTo() 方法的底层实现就是 sendfile() 方法
下图中左侧为传统方式,右侧为零拷贝,详细可见:https://developer.ibm.com/articles/j-zerocopy/

聊一聊Kafka的延时操作的原理
Kafka 中有多种延时操作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。
延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。
聊一聊Kafka控制器的作用
controller选举: kafka集群在创建时,会在Zookeeper中创建临时节点
/controller,创建成功的那个broker为此次的controller,
其他的broker会对该控制器节点创建watch对象,监听该节点的变更,当控制器失效后,其他broker会进行抢注,当选新的controllercontroller作用:
- _主题管理_: 主题的创建、删除、修改分区;kafka-topics 脚本相关后台操作基本上都是由controller帮我们完成的
- _分区重新分配_:kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能
- _Preferred 领导者选举_:当某个broker节点下线重新上线后,该broker节点上的所有分区副本均为Follower,会使分区Leader分布不均匀,这个可以协调Leader
- _broker管理_: 自动检测broker的新增、宕机,依赖于利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更,因为每个节点在启动后都会在
此节点下创建临时节点;- _存储集群数据_:向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据
消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)
- 名词解释
消费者协调器(ConsumerCoordinator): 位于消费者客户端的组件,负责与GroupCoordinator通信
消费组协调器(GroupCoordinator:) 位于kafka服务端,用于管理消费组的组件 - 什么时候会发生消费者再均衡?
- 有新的消费者加入消费组或者宕机下线(不一定真的宕机,有可能只是长时间未与GroupCoordinator发送心跳,默认45秒就被判定掉线)
- 消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了 unsubscrible() 方法取消对某些主题的订阅
- 消费组所对应的 GroupCoorinator 节点发生了变更
- 消费组内所订阅的任一主题或者主题的分区数量发生变化。
- 消费者消费过程
- FIND_COORDINATOR:确定消费者组对应的GroupCoordinator所在broker,并创建相互通信的网络连接,若连接不正常,就需要向集群中的负载最小的节点发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator。
- JOIN_GROUP:消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。且会选择出一个消费者的Leader,并选出大多消费者支持的分区副本分配策略
- SYNC_GROUP:消费者Leader将根据阶段二选出分配策略,实施具体的分配方案,并将方案通过GroupCoordinator同步给各个消费者
- HEARTBEAT:消费者通过向GroupCoordinator发送心跳来维护自己的活跃性,默认每3秒一次,45秒超时,且心跳和消费是俩个独立的线程
Kafka中的幂等是怎么实现的
Kafka为此引入了producerId(简称 PID)和序列号(sequence number)这两个概念,每个生产者实例在初始化的时候会被分配一个PID(用户无感知,可以日志文件里查看),
对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将 <PID,分区> 对应的序列号的值加1,为每一对 <PID,分区> 维护一个序列号,
当出现乱序时,生产者会抛出 OutOfOrderSequenceException
Kafka中的事务是怎么实现的(这题我去面试6家被问4次,照着答案念也要念十几分钟,面试官简直凑不要脸。实在记不住的话…只要简历上不写精通Kafka一般不会问到,我简历上写的是“熟悉Kafka,了解RabbitMQ….”)
kafka事务是以幂等性为前提,生产者配置一个transactionalId而生效的;每个transactionalId和PID相对应
每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中
一旦上述数据写入操作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。无论调用 commitTransaction() 方法还是 abortTransaction() 方法,生产者都会向 TransactionCoordinator 发送 EndTxnRequest 请求。
TransactionCoordinator 在收到 EndTxnRequest 请求后会执行如下操作:将 PREPARE_COMMIT 或 PREPARE_ABORT 消息写入主题 __transaction_state
通过 WriteTxnMarkersRequest 请求将 COMMIT 或 ABORT 信息写入用户所使用的普通主题和 __consumer_offsets
将 COMPLETE_COMMIT 或 COMPLETE_ABORT 信息写入内部主题 __transaction_state标明该事务结束
在消费端有一个参数isolation.level,设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。如果生产者开启事务并向某个分区值发送3条消息 msg1、msg2 和 msg3,在执行 commitTransaction() 或 abortTransaction() 方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在 KafkaConsumer 内部会缓存这些消息,直到生产者执行 commitTransaction() 方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了 abortTransaction() 方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。
Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
失效副本是指什么?有那些应对措施?
正常情况下,分区的所有副本都处于 ISR 集合中,但是难免会有异常情况发生,从而某些副本被剥离出 ISR 集合中。在 ISR 集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即 under-replicated 分区
一般有这几种情况会导致副本失效:
- follower 副本进程卡住,在一段时间内根本没有向 leader 副本发起同步请求,比如频繁的 Full GC。
- follower 副本进程同步过慢,在一段时间内都无法追赶上 leader 副本,比如 I/O 开销过大。
- 如果通过工具增加了副本因子,那么新增加的副本在赶上 leader 副本之前也都是处于失效状态的。
- 如果一个 follower 副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上 leader 副本之前也处于失效状态
多副本下,各个副本中的HW和LEO的演变过程
为什么Kafka不支持读写分离?
数据一致性问题: 数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。
延时问题: 数据从写入主节点到同步至从节点中的过程需要经历网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
对于Kafka来说,必要性不是很高,因为在Kafka集群中,如果存在多个副本,经过合理的配置,可以让leader副本均匀的分布在各个broker上面,使每个 broker 上的读写负载都是一样的。
Kafka在可靠性方面做了哪些改进?(HW, LeaderEpoch)
HW: HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。
分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息LeaderEpoch: 代表 leader 的纪元信息(epoch),初始值为0。每当 leader 变更一次,leader epoch 的值就会加1
leader epoch 的值就会加1,相当于为 leader 增设了一个版本号。每个副本中还会增设一个矢量 <LeaderEpoch => StartOffset>,
其中 StartOffset 表示当前 LeaderEpoch 下写入的第一条消息的偏移量。
假设有两个节点A和B,B是leader节点,里面的数据如图:

A发生重启,之后A不是先忙着截断日志而是先发送OffsetsForLeaderEpochRequest请求给B,B作为目前的leader在收到请求之后会返回当前的LEO(LogEndOffset,注意图中LE0和LEO的不同),与请求对应的响应为OffsetsForLeaderEpochResponse。如果 A 中的 LeaderEpoch(假设为 LE_A)和 B 中的不相同,那么 B 此时会查找 LeaderEpoch 为 LE_A+1 对应的 StartOffset 并返回给 A

如上图所示,A 在收到2之后发现和目前的 LEO 相同,也就不需要截断日志了,以此来保护数据的完整性。
再如,之后 B 发生了宕机,A 成为新的 leader,那么对应的 LE=0 也变成了 LE=1,对应的消息 m2 此时就得到了保留。后续的消息都可以以 LE1 为 LeaderEpoch 陆续追加到 A 中。这个时候A就会有两个LE,第二LE所记录的Offset从2开始。如果B恢复了,那么就会从A中获取到LE+1的Offset为2的值返回给B。

再来看看LE如何解决数据不一致的问题:
当前 A 为 leader,B 为 follower,A 中有2条消息 m1 和 m2,而 B 中有1条消息 m1。假设 A 和 B 同时“挂掉”,然后 B 第一个恢复过来并成为新的 leader。

之后 B 写入消息 m3,并将 LEO 和 HW 更新至2,如下图所示。注意此时的 LeaderEpoch 已经从 LE0 增至 LE1 了。

紧接着 A 也恢复过来成为 follower 并向 B 发送 OffsetsForLeaderEpochRequest 请求,此时 A 的 LeaderEpoch 为 LE0。B 根据 LE0 查询到对应的 offset 为1并返回给 A,A 就截断日志并删除了消息 m2,如下图所示。之后 A 发送 FetchRequest 至 B 请求来同步数据,最终A和B中都有两条消息 m1 和 m3,HW 和 LEO都为2,并且 LeaderEpoch 都为 LE1,如此便解决了数据不一致的问题。

Kafka中怎么实现死信队列和重试队列?
Kafka中的延迟队列怎么实现(这题被问的比事务那题还要多!!!听说你会Kafka,那你说说延迟队列怎么实现?)
Kafka中怎么做消息审计?
消息审计是指在消息生产、存储和消费的整个过程之间对消息个数及延迟的审计,以此来检测是否有数据丢失、是否有数据重复、端到端的延迟又是多少等内容。
目前与消息审计有关的产品也有多个,比如 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它们主要通过在消息体(value 字段)或在消息头(headers 字段)中内嵌消息对应的时间戳 timestamp 或全局的唯一标识 ID(或者是两者兼备)来实现消息的审计功能。
内嵌 timestamp 的方式主要是设置一个审计的时间间隔 time_bucket_interval(可以自定义设置几秒或几分钟),根据这个 time_bucket_interval 和消息所属的 timestamp 来计算相应的时间桶(time_bucket)。
内嵌 ID 的方式就更加容易理解了,对于每一条消息都会被分配一个全局唯一标识 ID。如果主题和相应的分区固定,则可以为每个分区设置一个全局的 ID。当有消息发送时,首先获取对应的 ID,然后内嵌到消息中,最后才将它发送到 broker 中。消费者进行消费审计时,可以判断出哪条消息丢失、哪条消息重复。
Kafka中怎么做消息轨迹?
消息轨迹指的是一条消息从生产者发出,经由 broker 存储,再到消费者消费的整个过程中,各个相关节点的状态、时间、地点等数据汇聚而成的完整链路信息。生产者、broker、消费者这3个角色在处理消息的过程中都会在链路中增加相应的信息,将这些信息汇聚、处理之后就可以查询任意消息的状态,进而为生产环境中的故障排除提供强有力的数据支持。
对消息轨迹而言,最常见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。无论生产,还是消费,在执行之后都会有相应的轨迹信息,我们需要将这些信息保存起来。
我们同样可以将轨迹信息保存到 Kafka 的某个主题中,比如下图中的主题 trace_topic。

Kafka中有那些配置参数比较有意思?聊一聊你的看法
Kafka中有那些命名比较有意思?聊一聊你的看法
Kafka有哪些指标需要着重关注?
比较重要的 Broker 端 JMX 指标:
- BytesIn/BytesOut:即 Broker 端每秒入站和出站字节数。你要确保这组值不要接近你的网络带宽,否则这通常都表示网卡已被“打满”,很容易出现网络丢包的情形。
- NetworkProcessorAvgIdlePercent:即网络线程池线程平均的空闲比例。通常来说,你应该确保这个 JMX 值长期大于 30%。如果小于这个值,就表明你的网络线程池非常繁忙,你需要通过增加网络线程数或将负载转移给其他服务器的方式,来给该 Broker 减负。
- RequestHandlerAvgIdlePercent:即 I/O 线程池线程平均的空闲比例。同样地,如果该值长期小于 30%,你需要调整 I/O 线程池的数量,或者减少 Broker 端的负载。
- UnderReplicatedPartitions:即未充分备份的分区数。所谓未充分备份,是指并非所有的 Follower 副本都和 Leader 副本保持同步。一旦出现了这种情况,通常都表明该分区有可能会出现数据丢失。因此,这是一个非常重要的 JMX 指标。
- ISRShrink/ISRExpand:即 ISR 收缩和扩容的频次指标。如果你的环境中出现 ISR 中副本频繁进出的情形,那么这组值一定是很高的。这时,你要诊断下副本频繁进出 ISR 的原因,并采取适当的措施。
- ActiveControllerCount:即当前处于激活状态的控制器的数量。正常情况下,Controller 所在 Broker 上的这个 JMX 指标值应该是 1,其他 Broker 上的这个值是 0。如果你发现存在多台 Broker 上该值都是 1 的情况,一定要赶快处理,处理方式主要是查看网络连通性。这种情况通常表明集群出现了脑裂。脑裂问题是非常严重的分布式故障,Kafka 目前依托 ZooKeeper 来防止脑裂。但一旦出现脑裂,Kafka 是无法保证正常工作的。
怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)
如果消费者客户端的 isolation.level 参数配置为“read_uncommitted”(默认),它对应的 Lag 等于HW – ConsumerOffset 的值,其中 ConsumerOffset 表示当前的消费位移。
如果这个参数配置为“read_committed”,那么就要引入 LSO 来进行计算了。LSO 是 LastStableOffset 的缩写,它对应的 Lag 等于 LSO – ConsumerOffset 的值。
- 首先通过 DescribeGroupsRequest 请求获取当前消费组的元数据信息,当然在这之前还会通过 FindCoordinatorRequest 请求查找消费组对应的 GroupCoordinator。
- 接着通过 OffsetFetchRequest 请求获取消费位移 ConsumerOffset。
- 然后通过 KafkaConsumer 的 endOffsets(Collection partitions)方法(对应于 ListOffsetRequest 请求)获取 HW(LSO)的值。
- 最后通过 HW 与 ConsumerOffset 相减得到分区的 Lag,要获得主题的总体 Lag 只需对旗下的各个分区累加即可。
Kafka的那些设计让它有如此高的性能?
- 分区: 主题topic会有多个分区,kafka将分区均匀地分配到整个集群中,当生产者向对应主题传递消息,消息通过负载均衡机制传递到不同的分区以减轻单个服务器实例的压力。
一个Consumer Group中可以有多个consumer,多个consumer可以同时消费不同分区的消息,大大的提高了消费者的并行消费能力- 网络传输: 采用批量发送和拉取和端到端的信息压缩,(kafaka会将这些批量的数据进行压缩,将一批消息打包后进行压缩,发送broker服务器后,最终这些数据还是提供给消费者用,所以数据在服务器上还是保持压缩状态,不会进行解压,而且频繁的压缩和解压也会降低性能,最终还是以压缩的方式传递到消费者的手上)
- 顺序读写: kafka将消息追加到日志文件中,利用了磁盘的顺序读写,来提高读写效率
- 零拷贝: 零拷贝将文件内容从磁盘通过DMA引擎复制到内核缓冲区,而且没有把数据复制到socket缓冲区,只是将数据位置和长度信息的描述符复制到了socket缓存区,然后直接将数据传输到网络接口,最后发送。这样大大减小了拷贝的次数,提高了效率。kafka正是调用linux系统给出的sendfile系统调用来使用零拷贝。Java中的系统调用给出的是FileChannel.transferTo接口。
- 存储机制: 如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。
Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为4096,即 4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes 的值,对应地可以增加或缩小索引项的密度。