1. 分区器

broker配置项:partitioner.class

  • org.apache.kafka.clients.producer.internals.DefaultPartitioner
  • org.apache.kafka.clients.producer.RoundRobinPartitioner
  • org.apache.kafka.clients.producer.Partitioner

1.1 默认分区器(DefaultPartitioner)

  • 若发送时指定分区,则发送到指定的分区中
  • 未指定分区,指定了Key,则Key的hashcode%分区数
  • 均未指定,采取粘性规则,第一次随机选择分区,直到缓存满或者时间到发送完消息,下一次继续随机但不会选择上次使用的分区

1.2 RoundRobinPartitioner

这种分区策略是一系列连续记录中的每条记录将被发送到不同的分区(无论是否提供’key’),直到我们用完分区并重新开始。注意:有一个已知问题会在创建新批次时导致分布不均

1.3 UniformStickyPartitioner

此分区策略将尝试坚持一个分区(无论是否提供了“key”),直到batch.size已满或已满linger.ms

1.4 自定义分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

/**
* 自定义hash分区,实现Partitioner接口,重写partition方法
*/
public class MyPartition implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes1, Cluster cluster) {
// 获取topic中partition数量
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitionInfoList.size();
// 根据key的hash值计取模,计算出在哪个分区中
int numPartitions = Math.abs(String.valueOf(key).hashCode()) % partitionCount;
return numPartitions;
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> map) {

}
}
kafkaProperties.put("partitioner.class","com.pg.kafka.MyPartition");

2. 分区节点分布策略

创建topic,分配时,规则是尽量均匀将所有分区副本分布在各个broker上

1
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 3  --partitions 16 --topic test2

查看topic信息

1
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe  --topic test2

分配规则大致如下,借鉴网上教程的图

3. 分区消费者分配策略

消费者配置项:partition.assignment.strategy

3.1 org.apache.kafka.clients.consumer.RangeAssignor

以Topic为基础,将分区以分区号排序均匀分布给每个消费组中的消费者(按字母顺序排序),比如topic有7个分区,消费组中有3个消费者,
他会将分区数与消费者整除,余出的会加在前面的消费者身上,分区如下

  • 1号消费者:0,1,2
  • 2号消费者:3,4
  • 3号消费者:5,6

这样问题就来了,当同一消费组订阅Topic过多,且不能整除的时候,前面的消费者会承担更多的分区消费,容易产生数据倾斜
且这种策略,当一个消费者挂了之后,原属于该消费者的分区的消费任务会全部加到另一个分区上去,消费完成,后续的才会触发在再平衡

3.2 org.apache.kafka.clients.consumer.RoundRobinAssignor

将一个消费组中的所有订阅的topic的分区汇在一起,按照消费者进行轮询分配,当消费组内所有消费者订阅Topic相同时,则这种分配时均匀的,如下:
消费者1,消费者2均订阅Topic1,Topic2,俩个Topic均有3个分区,则分配如下:

  • 消费者1:T1P0 T1P2 T2P1
  • 消费者2:T1P1 T2P0 T2P2

但当组内消费者订阅Topic不相同的时候,也会造成分配不均匀,例如:
消费者1,消费者2均订阅Topic1,Topic2,俩个Topic均有1个分区,且消费者2还订阅Topic3,有俩个分区则分配如下:

  • 消费者1:T1P0
  • 消费者2:T2P0 T3P0 T3P1

3.3 org.apache.kafka.clients.consumer.StickyAssignor

本策略有两个目标, 首先是要实现分区分配要尽可能地均匀,其次当发生分区再平衡发生时,分区的分配会尽可能的与上次的分配结果保持一致,目的是为了防止
分区的消费者发生变化,这有助于节约开销,也有助于避免消息重复消费的问题发生。需要注意的是,当以上两点发生冲突的时候,第一个目标是优先于第二个目标的,例如:
三个消费者C1,C2,C3,订阅了三个主题,且每个主题2个分区,

  • C1:T1P0 T2P1
  • C2:T1P1 T3P0
  • C3:T2P0 T3P1 不一定按照这个排序哈

当订阅不同时,例如
三个消费者,三个topic,分别有1,2,3个分区,消费者C1订阅了主题T0,消费者C2订阅了主题T0、T1,消费者C3订阅了主题T0、T1、T2分配如下

  • C1:T0P0
  • C2:T1P0 T1P1
  • C3:T2P0 T2P1 T2P2

当C1挂掉的时候会再平衡为

  • C2:T1P0 T1P1 T0P0
  • C3:T2P0 T2P1 T2P2

RoundRobinAssignor和StickyAssignor非常重的要区别

  • StickyAssignor在消费组中每个消费者订阅不同topic时,能够使分配更加均匀
  • StickyAssignor某个消费者宕机后,再平衡时能够保留上次的,分配结果,只对宕机上的分区进行再分配,而RoundRobinAssignor不能保证,
    比如说,C1,C2,C3,在C1下线后,会将所有分区轮询C2,C3进行重新分配

3.4 org.apache.kafka.clients.consumer.CooperativeStickyAssignor

3.5 自定义Assignor

实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;

public class MyConsumerPartitionAssignor implements ConsumerPartitionAssignor {

@Override
public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {
return null;
}

@Override
public String name() {
return null;
}
}

3.6 消费者分区分配规则流程