1. kafka配置详解

1.1 broker配置

  • broker.id:(必须配置)节点ID,必须全局唯一
  • log.dir:(必须配置)日志存放目录,默认值(/tmp/kafka-logs),tmp目录会被系统定期清理
  • zookeeper.connect:(必须配置)zookeeper连接
  • auto.create.topics.enable:是否自动创建主题,默认为true
  • auto.leader.rebalance.enable:分区leader自动负载均衡,默认true
  • compression.type:压缩类型,可选值 (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’)

1.2 producer配置

  • client.id:生产者客户端Id
  • batch.size:批量发送数据大小,默认16K
  • linger.ms:延迟时间,默认0
  • buffer.memory:缓冲区大小,默认32M
  • compression.type:消息压缩格式,默认none,可选none, gzip, snappy, lz4, or zstd
  • acks:消息确认机制,默认all,即-1,可选[all, -1, 0, 1],0-不需要回应,1-等leader落盘后回应,-1-所有副本落盘后回应
  • partitioner.class:分区器类路径,默认DefaultPartitioner,还提供RoundRobinPartitioner,UniformStickyPartitioner
  • key.serializer:key的序列化器
  • value.serializer:value的序列化器
  • bootstrap.servers:kafka地址
  • retries:重试次数,默认int最大值
  • enable.idempotence:幂等性,默认true,根据<pid,分区号,序列号>去重
  • max.in.flight.requests.per.connection:生产者在收到kafka响应前最大发送请求数,默认5
  • transactional.id:事务ID,全局唯一,基于enable.idempotence

1.3 consume配置

  • group.id:消费者所属的群组ID
  • enable.auto.commit:自动提交,默认开启,一般关闭
  • auto.offset.reset:有效值为“earliest”“latest”“none”,默认latest
  • fetch.min.bytes:最小拉取字节数,默认1(B),
  • fetch.max.wait.mss:拉取最大等待时间数,默认500(ms),
  • max-poll-records:一次请求最大拉取的消息条数,默认500
  • key.serializer:key的序列化器
  • value.serializer:value的序列化器
  • bootstrap.servers:kafka地址

2. 常用命令

2.1 启动停止

  • ./bin/kafka-server-start.sh -daemon ./config/server.properties
  • ./bin/kafka-server-stop.sh

2.2 主题(kafka-topic.sh)

  • –bootstrap-server IP:PORT(多个用逗号分隔)
  • –topic 主题名称
  • –create
  • –delete
  • –describe
  • –partitions
  • –replication-factor
  • –list

2.3 生产者(kafka-console-producer.sh)

  • –bootstrap-server IP:PORT(多个用逗号分隔)
  • –topic 主题名称

2.4 消费者(kafka-console-consumer.sh)

  • –bootstrap-server IP:PORT(多个用逗号分隔)
  • –topic 主题名称 –from-beginning
  • –group 指定消费组