目录

kafka 配置参数详解

Broker 配置参数

属性 默认值 释义
broker.id -1 该参数用来指定 Kafka 集群中 broker 的唯一标识,默认值为-1。如果没有设置,那么 Kafka 会自动生成一个。
listeners null 该参数指明 broker 监听客户端连接的地址列表,即为客户端要连接 broker 的入口地址列表,配置格式为 protocol1://hostname1:port1,protocol2://hostname2:port2,其中 protocol 代表协议类型,Kafka 当前支持的协议类型有 PLAINTEXT、SSL、SASL_SSL 等,如果未开启安全认证,则使用简单的 PLAINTEXT 即可。hostname 代表主机名,port 代表服务端口,此参数的默认值为 null。比如此参数配置为 PLAINTEXT://198.162.0.2:9092,如果有多个地址,则中间以逗号隔开。如果不指定主机名,则表示绑定默认网卡,注意有可能会绑定到 127.0.0.1,这样无法对外提供服务,所以主机名最好不要为空;如果主机名是 0.0.0.0,则表示绑定所有的网卡。
advertised.listeners null 作用和 listeners 类似,绑定公网 IP 供外部客户端使用
listener.security.protocol.map null listener 支持的协议集
num.network.threads 3 server 用来处理网络请求的网络线程数目;一般你不需要更改这个属性。
num.io.threads 8 server 用来处理请求的 I/O 线程的数目;这个线程数目至少要等于硬盘的个数。
socket.send.buffer.bytes 100 * 1024 SO_SNDBUFF 缓存大小,server 进行 socket 连接所用
socket.receive.buffer.bytes 100 * 1024 SO_RCVBUFF 缓存大小,server 进行 socket 连接时所用
socket.request.max.bytes 100 * 1024 * 1024 server 允许的最大请求尺寸; 这将避免 server 溢出,它应该小于 Java heap size
log.dirs /tmp/kafka-logs kafka 存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行。
num.partitions 1 如果创建 topic 时没有给出划分 partitions 个数,这个数字将是 topic 下 partitions 数目的默认数值。
num.recovery.threads.per.data.dir 1 每个数据目录用来日志恢复的线程数目
offsets.topic.replication.factor 1 topic 的 offset 的备份份数。建议设置更高的数字保证更高的可用性
transaction.state.log.replication.factor 1 事务主题的复制因子(设置更高以确保可用性)
transaction.state.log.min.isr 1 覆盖事务主题的 min.insync.replicas 配置
log.flush.interval.messages 1000 log 文件“sync”到磁盘之前累积的消息条数。
log.flush.interval.ms 1000 用来控制”fsync“的时间间隔,如果消息量始终没有达到固化到磁盘的消息数,但是离上次磁盘同步的时间间隔达到阈值,也将触发磁盘同步。
log.retention.hours 168 日志文件保留小时数
log.retention.bytes -1 每个 topic 下每个 partition 保存数据的总量;注意,这是每个 partitions 的上限,因此这个数值乘以 partitions 的个数就是每个 topic 保存的数据总量。同时注意:如果 log.retention.hours 和 log.retention.bytes 都设置了,则超过了任何一个限制都会造成删除一个段文件。
log.segment.bytes 1024 * 1024 * 1024 topic partition 的日志存放在某个目录下诸多文件中,这些文件将 partition 的日志切分成一段一段的;这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设置可以由每个 topic 基础设置时进行覆盖。
log.retention.check.interval.ms 300000:5 分钟 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求。
zookeeper.connect null 该参数指明 broker 要连接的 ZooKeeper 集群的服务地址(包含端口号),没有默认值,且此参数为必填项。可以配置为 localhost:2181,如果 ZooKeeper 集群中有多个节点,则可以用逗号将每个节点隔开,类似于 localhost1:2181,localhost2:2181,localhost3:2181 这种格式。最佳的实践方式是再加一个 chroot 路径,这样既可以明确指明该 chroot 路径下的节点是为 Kafka 所用的,也可以实现多个 Kafka 集群复用一套 ZooKeeper 集群,这样可以节省更多的硬件资源。包含 chroot 路径的配置类似于 localhost1:2181,localhost2:2181,localhost3:2181/kafka 这种,如果不指定 chroot,那么默认使用 ZooKeeper 的根路径。
zookeeper.connection.timeout.ms 6000 客户端等待和 zookeeper 建立连接的最大时间
group.initial.rebalance.delay.ms 3 秒 这个参数的主要效果就是让 coordinator 推迟空消费组接收到成员加入请求后本应立即开启的 rebalance。在实际使用时,假设你预估你的所有 consumer 组成员加入需要在 10s 内完成,那么你就可以设置该参数=10000。

生产者配置参数

属性 默认值 释义
boostrap.servers null 该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
acks 1 producer 需要 server 接收到数据之后发出的确认接收的信号,此项配置就是指 procuder 需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
(1)acks=0: 设置为 0 表示 producer 不需要等待任何确认收到的信息。副本将立即加到 socket buffer 并认为已经发送。没有任何保障可以保证此种情况下 server 已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的 offset 会总是设置为-1;
(2)acks=1: 这意味着至少要等待 leader 已经成功将数据写入本地 log,但是并没有等待所有 follower 是否成功写入。这种情况下,如果 follower 没有成功备份数据,而此时 leader 又挂掉,则消息会丢失。
(3)acks=all: 这意味着 leader 需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
(4)其他的设置,例如 acks=2 也是可以的,这将需要给定的 acks 数量,但是这种策略一般很少用。
max.request.size 1028576:1M 请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server 具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制 producer 每次批量发送请求的数目,以防发出巨量的请求。
compression.type null producer 用于压缩数据的压缩类型。默认是无压缩。正确的选项值是 none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
retries 0 设置大于 0 的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个 partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
retry.backoff.ms 100 在试图重试失败的 produce 请求之前的等待时间。避免陷入发送-失败的死循环中。
connections.max.idle.ms 540000ms 这个参数用来指定在多久之后关闭限制的连接
linger.ms 0 这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。
receive.buffer.bytes 32768 这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 32768(B),即 32KB。如果设置为-1,则使用操作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。
send.buffer.bytes 131072 这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 131072(B),即 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。
buffer.memory 33554432 producer 可以用来缓存数据的内存大小。如果数据产生速度大于向 broker 发送的速度,producer 会阻塞或者抛出异常,以“block.on.buffer.full”来表明。
batch.size 16384 producer 将试图批处理消息记录,以减少请求次数。这将改善 client 与 server 之间的性能。这项配置控制默认的批量处理消息字节数。不会试图处理大于这个字节数的消息字节数。发送到 brokers 的请求将包含多个批量处理,其中会包含对每个 partition 的一个请求。较小的批量处理数值比较少用,并且可能降低吞吐量(0 则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。
client.id null 当向 server 发出请求时,这个字符串会发送给 server。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪
partitioner.class kafka.producer.DefaultPartitioner partitioner 类,用于在 subtopics 之间划分消息。默认 partitioner 基于 key 的 hash 表
request.timeout.ms 30000 这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
max.block.ms 60000 用来控制 KafkaProducer 中 send()方法和 partitionsFor()方法的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞
timeout.ms 30000 此配置选项控制 server 等待来自 followers 的确认的最大时间。如果确认的请求数目在此时间内没有实现,则会返回一个错误。这个超时限制是以 server 端度量的,没有包含请求的网络延迟
block.on.buffer.full true 当我们内存缓存用尽时,必须停止接收新消息记录或者抛出错误。默认情况下,这个设置为真,然而某些阻塞可能不值得期待,因此立即抛出错误更好。设置为 false 则会这样:如果记录已经发送同时缓存已满,producer 会抛出一个异常错误:BufferExhaustedException
metadata.fetch.timeout.ms 60000 是指我们所获取的一些元素据的第一个时间数据。元素据包含:topic,host,partitions。此项配置是指当等待元素据 fetch 成功完成所需要的时间,否则会跑出异常给客户端。
metadata.max.age.ms 300000 以微秒为单位的时间,是在我们强制更新 metadata 的时间间隔。即使我们没有看到任何 partition leadership 改变。
metric.reporters [] 类的列表,用于衡量指标。实现 MetricReporter 接口,将允许增加一些类,这些类在新的衡量指标产生时就会改变。JmxReporter 总会包含用于注册 JMX 统计
metrics.num.samples 2 用于维护 metrics 的样本数
metrics.sample.window.ms 30000 metrics 系统维护可配置的样本数量,在一个可修正的 window size。这项配置配置了窗口大小,例如。我们可能在 30s 的期间维护两个样本。当一个窗口推出后,我们会擦除并重写最老的窗口
recoonect.backoff.ms 10 连接失败时,当我们重新连接时的等待时间。这避免了客户端反复重连

消费者配置参数

属性 默认值 释义
bootstrap.servers null 该参数的释义和生产者客户端 KafkaProducer 中的相同,用来 指 定 连 接 Kafka 集 群 所 需 的 broker 地 址 清 单,具 体 内 容 形 式 为 host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的 broker 地址,消费者会从现有的配置中查找到全部的 Kafka 集群成员。这里设置两个以上的 broker 地址信息,当其中任意一个宕机时,消费者仍然可以连接到 Kafka 集群上。
group.id null 消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exception in thread “main"org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。
key.deserializer null 用来指定消息中 key 所需反序列化操作的反序列化器,无默认值。
value.deserializer null 用来指定消息中 value 所需反序列化操作的反序列化器,无默认值。
client.id null 这个参数用来设定 KafkaConsumer 对应的客户端 id,默认值也为“”。如果客户端不设置,则 KafkaConsumer 会自动生成一个非空字符串,内容形式如“consumer-1”“consumer-2”,即字符串“consumer-”与数字的拼接。
consumer.id null 不需要设置,一般自动产生
socket.timeout.ms 30*100 网络请求的超时限制。真实的超时限制是 max.fetch.wait+socket.timeout.ms
socket.receive.buffer.bytes 64*1024 socket 用于接收网络请求的缓存大小
fetch.message.max.bytes 1024*1024 每次 fetch 请求中,针对每次 fetch 消息的最大字节数。这些字节将会督导用于每个 partition 的内存中,因此,此设置将会控制 consumer 所使用的 memory 大小。这个 fetch 请求尺寸必须至少和 server 允许的最大消息尺寸相等,否则,producer 可能发送的消息尺寸大于 consumer 所能消耗的尺寸。
num.consumer.fetchers 1 用于 fetch 数据的 fetcher 线程数
auto.commit.enable true 如果为真,consumer 所 fetch 的消息的 offset 将会自动的同步到 zookeeper。这项提交的 offset 将在进程挂掉时,由新的 consumer 使用
auto.commit.interval.ms 60*1000 consumer 向 zookeeper 提交 offset 的频率,单位是秒
queued.max.message.chunks 2 用于缓存消息的最大数目,以供 consumption。每个 chunk 必须和 fetch.message.max.bytes 相同
rebalance.max.retries 4 当新的 consumer 加入到 consumer group 时,consumers 集合试图重新平衡分配到每个 consumer 的 partitions 数目。如果 consumers 集合改变了,当分配正在执行时,这个重新平衡会失败并重入
fetch.min.bytes 1 每次 fetch 请求时,server 应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。
fetch.wait.max.ms 100 如果没有足够的数据能够满足 fetch.min.bytes,则此项配置是指在应答 fetch 请求之前,server 会阻塞的最大时间。
rebalance.backoff.ms 2000 在重试 reblance 之前 backoff 时间
refresh.leader.backoff.ms 200 在试图确定某个 partition 的 leader 是否失去他的 leader 地位之前,需要等待的 backoff 时间
auto.offset.reset largest zookeeper 中没有初始化的 offset 时,如果 offset 是以下值的回应:
smallest:自动复位 offset 为 smallest 的 offset
largest:自动复位 offset 为 largest 的 offset
anything else:向 consumer 抛出异常
consumer.timeout.ms -1 如果没有消息可用,即使等待特定的时间之后也没有,则抛出超时异常
exclude.internal.topics true 是否将内部 topics 的消息暴露给 consumer
paritition.assignment.strategy range 选择向 consumer 流分配 partitions 的策略,可选值:range,roundrobin
client.id group id value 是用户特定的字符串,用来在每次请求中帮助跟踪调用。它应该可以逻辑上确认产生这个请求的应用
zookeeper.session.timeout.ms 6000 zookeeper 会话的超时限制。如果 consumer 在这段时间内没有向 zookeeper 发送心跳信息,则它会被认为挂掉了,并且 reblance 将会产生
zookeeper.connection.timeout.ms 6000 客户端在建立通 zookeeper 连接中的最大等待时间
zookeeper.sync.time.ms 2000 ZK follower 可以落后 ZK leader 的最大时间
offsets.storage zookeeper 用于存放 offsets 的地点: zookeeper 或者 kafka
offset.channel.backoff.ms 1000 重新连接 offsets channel 或者是重试失败的 offset 的 fetch/commit 请求的 backoff 时间
offsets.channel.socket.timeout.ms 10000 当读取 offset 的 fetch/commit 请求回应的 socket 超时限制。此超时限制是被 consumerMetadata 请求用来请求 offset 管理
offsets.commit.max.retries 5 重试 offset commit 的次数。这个重试只应用于 offset commits 在 shut-down 之间。他
dual.commit.enabled true 如果使用“kafka”作为 offsets.storage,你可以二次提交 offset 到 zookeeper(还有一次是提交到 kafka)。在 zookeeper-based 的 offset storage 到 kafka-based 的 offset storage 迁移时,这是必须的。对任意给定的 consumer group 来说,比较安全的建议是当完成迁移之后就关闭这个选项
partition.assignment.strategy range 在“range”和“roundrobin”策略之间选择一种作为分配 partitions 给 consumer 数据流的策略; 循环的 partition 分配器分配所有可用的 partitions 以及所有可用 consumer 线程。它会将 partition 循环的分配到 consumer 线程上。如果所有 consumer 实例的订阅都是确定的,则 partitions 的划分是确定的分布。循环分配策略只有在以下条件满足时才可以:(1)每个 topic 在每个 consumer 实力上都有同样数量的数据流。(2)订阅的 topic 的集合对于 consumer group 中每个 consumer 实例来说都是确定的。

参考

《深入理解 Kafka:核心设计与实践原理》 朱忠华