kafka 配置参数详解
目录
Broker 配置参数
属性 | 默认值 | 释义 |
---|---|---|
broker.id | -1 | 该参数用来指定 Kafka 集群中 broker 的唯一标识,默认值为-1。如果没有设置,那么 Kafka 会自动生成一个。 |
listeners | null | 该参数指明 broker 监听客户端连接的地址列表,即为客户端要连接 broker 的入口地址列表,配置格式为 protocol1://hostname1:port1,protocol2://hostname2:port2,其中 protocol 代表协议类型,Kafka 当前支持的协议类型有 PLAINTEXT、SSL、SASL_SSL 等,如果未开启安全认证,则使用简单的 PLAINTEXT 即可。hostname 代表主机名,port 代表服务端口,此参数的默认值为 null。比如此参数配置为 PLAINTEXT://198.162.0.2:9092,如果有多个地址,则中间以逗号隔开。如果不指定主机名,则表示绑定默认网卡,注意有可能会绑定到 127.0.0.1,这样无法对外提供服务,所以主机名最好不要为空;如果主机名是 0.0.0.0,则表示绑定所有的网卡。 |
advertised.listeners | null | 作用和 listeners 类似,绑定公网 IP 供外部客户端使用 |
listener.security.protocol.map | null | listener 支持的协议集 |
num.network.threads | 3 | server 用来处理网络请求的网络线程数目;一般你不需要更改这个属性。 |
num.io.threads | 8 | server 用来处理请求的 I/O 线程的数目;这个线程数目至少要等于硬盘的个数。 |
socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF 缓存大小,server 进行 socket 连接所用 |
socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF 缓存大小,server 进行 socket 连接时所用 |
socket.request.max.bytes | 100 * 1024 * 1024 | server 允许的最大请求尺寸; 这将避免 server 溢出,它应该小于 Java heap size |
log.dirs | /tmp/kafka-logs | kafka 存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行。 |
num.partitions | 1 | 如果创建 topic 时没有给出划分 partitions 个数,这个数字将是 topic 下 partitions 数目的默认数值。 |
num.recovery.threads.per.data.dir | 1 | 每个数据目录用来日志恢复的线程数目 |
offsets.topic.replication.factor | 1 | topic 的 offset 的备份份数。建议设置更高的数字保证更高的可用性 |
transaction.state.log.replication.factor | 1 | 事务主题的复制因子(设置更高以确保可用性) |
transaction.state.log.min.isr | 1 | 覆盖事务主题的 min.insync.replicas 配置 |
log.flush.interval.messages | 1000 | log 文件“sync”到磁盘之前累积的消息条数。 |
log.flush.interval.ms | 1000 | 用来控制”fsync“的时间间隔,如果消息量始终没有达到固化到磁盘的消息数,但是离上次磁盘同步的时间间隔达到阈值,也将触发磁盘同步。 |
log.retention.hours | 168 | 日志文件保留小时数 |
log.retention.bytes | -1 | 每个 topic 下每个 partition 保存数据的总量;注意,这是每个 partitions 的上限,因此这个数值乘以 partitions 的个数就是每个 topic 保存的数据总量。同时注意:如果 log.retention.hours 和 log.retention.bytes 都设置了,则超过了任何一个限制都会造成删除一个段文件。 |
log.segment.bytes | 1024 * 1024 * 1024 | topic partition 的日志存放在某个目录下诸多文件中,这些文件将 partition 的日志切分成一段一段的;这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设置可以由每个 topic 基础设置时进行覆盖。 |
log.retention.check.interval.ms | 300000:5 分钟 | 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求。 |
zookeeper.connect | null | 该参数指明 broker 要连接的 ZooKeeper 集群的服务地址(包含端口号),没有默认值,且此参数为必填项。可以配置为 localhost:2181,如果 ZooKeeper 集群中有多个节点,则可以用逗号将每个节点隔开,类似于 localhost1:2181,localhost2:2181,localhost3:2181 这种格式。最佳的实践方式是再加一个 chroot 路径,这样既可以明确指明该 chroot 路径下的节点是为 Kafka 所用的,也可以实现多个 Kafka 集群复用一套 ZooKeeper 集群,这样可以节省更多的硬件资源。包含 chroot 路径的配置类似于 localhost1:2181,localhost2:2181,localhost3:2181/kafka 这种,如果不指定 chroot,那么默认使用 ZooKeeper 的根路径。 |
zookeeper.connection.timeout.ms | 6000 | 客户端等待和 zookeeper 建立连接的最大时间 |
group.initial.rebalance.delay.ms | 3 秒 | 这个参数的主要效果就是让 coordinator 推迟空消费组接收到成员加入请求后本应立即开启的 rebalance。在实际使用时,假设你预估你的所有 consumer 组成员加入需要在 10s 内完成,那么你就可以设置该参数=10000。 |
生产者配置参数
属性 | 默认值 | 释义 |
---|---|---|
boostrap.servers | null | 该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。 |
acks | 1 | producer 需要 server 接收到数据之后发出的确认接收的信号,此项配置就是指 procuder 需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项: (1)acks=0: 设置为 0 表示 producer 不需要等待任何确认收到的信息。副本将立即加到 socket buffer 并认为已经发送。没有任何保障可以保证此种情况下 server 已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的 offset 会总是设置为-1; (2)acks=1: 这意味着至少要等待 leader 已经成功将数据写入本地 log,但是并没有等待所有 follower 是否成功写入。这种情况下,如果 follower 没有成功备份数据,而此时 leader 又挂掉,则消息会丢失。 (3)acks=all: 这意味着 leader 需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。 (4)其他的设置,例如 acks=2 也是可以的,这将需要给定的 acks 数量,但是这种策略一般很少用。 |
max.request.size | 1028576:1M | 请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server 具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制 producer 每次批量发送请求的数目,以防发出巨量的请求。 |
compression.type | null | producer 用于压缩数据的压缩类型。默认是无压缩。正确的选项值是 none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。 |
retries | 0 | 设置大于 0 的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个 partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 |
retry.backoff.ms | 100 | 在试图重试失败的 produce 请求之前的等待时间。避免陷入发送-失败的死循环中。 |
connections.max.idle.ms | 540000ms | 这个参数用来指定在多久之后关闭限制的连接 |
linger.ms | 0 | 这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。 |
receive.buffer.bytes | 32768 | 这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 32768(B),即 32KB。如果设置为-1,则使用操作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。 |
send.buffer.bytes | 131072 | 这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 131072(B),即 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。 |
buffer.memory | 33554432 | producer 可以用来缓存数据的内存大小。如果数据产生速度大于向 broker 发送的速度,producer 会阻塞或者抛出异常,以“block.on.buffer.full”来表明。 |
batch.size | 16384 | producer 将试图批处理消息记录,以减少请求次数。这将改善 client 与 server 之间的性能。这项配置控制默认的批量处理消息字节数。不会试图处理大于这个字节数的消息字节数。发送到 brokers 的请求将包含多个批量处理,其中会包含对每个 partition 的一个请求。较小的批量处理数值比较少用,并且可能降低吞吐量(0 则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。 |
client.id | null | 当向 server 发出请求时,这个字符串会发送给 server。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪 |
partitioner.class | kafka.producer.DefaultPartitioner | partitioner 类,用于在 subtopics 之间划分消息。默认 partitioner 基于 key 的 hash 表 |
request.timeout.ms | 30000 | 这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。 |
max.block.ms | 60000 | 用来控制 KafkaProducer 中 send()方法和 partitionsFor()方法的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞 |
timeout.ms | 30000 | 此配置选项控制 server 等待来自 followers 的确认的最大时间。如果确认的请求数目在此时间内没有实现,则会返回一个错误。这个超时限制是以 server 端度量的,没有包含请求的网络延迟 |
block.on.buffer.full | true | 当我们内存缓存用尽时,必须停止接收新消息记录或者抛出错误。默认情况下,这个设置为真,然而某些阻塞可能不值得期待,因此立即抛出错误更好。设置为 false 则会这样:如果记录已经发送同时缓存已满,producer 会抛出一个异常错误:BufferExhaustedException |
metadata.fetch.timeout.ms | 60000 | 是指我们所获取的一些元素据的第一个时间数据。元素据包含:topic,host,partitions。此项配置是指当等待元素据 fetch 成功完成所需要的时间,否则会跑出异常给客户端。 |
metadata.max.age.ms | 300000 | 以微秒为单位的时间,是在我们强制更新 metadata 的时间间隔。即使我们没有看到任何 partition leadership 改变。 |
metric.reporters | [] | 类的列表,用于衡量指标。实现 MetricReporter 接口,将允许增加一些类,这些类在新的衡量指标产生时就会改变。JmxReporter 总会包含用于注册 JMX 统计 |
metrics.num.samples | 2 | 用于维护 metrics 的样本数 |
metrics.sample.window.ms | 30000 | metrics 系统维护可配置的样本数量,在一个可修正的 window size。这项配置配置了窗口大小,例如。我们可能在 30s 的期间维护两个样本。当一个窗口推出后,我们会擦除并重写最老的窗口 |
recoonect.backoff.ms | 10 | 连接失败时,当我们重新连接时的等待时间。这避免了客户端反复重连 |
消费者配置参数
属性 | 默认值 | 释义 |
---|---|---|
bootstrap.servers | null | 该参数的释义和生产者客户端 KafkaProducer 中的相同,用来 指 定 连 接 Kafka 集 群 所 需 的 broker 地 址 清 单,具 体 内 容 形 式 为 host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的 broker 地址,消费者会从现有的配置中查找到全部的 Kafka 集群成员。这里设置两个以上的 broker 地址信息,当其中任意一个宕机时,消费者仍然可以连接到 Kafka 集群上。 |
group.id | null | 消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exception in thread “main"org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。 |
key.deserializer | null | 用来指定消息中 key 所需反序列化操作的反序列化器,无默认值。 |
value.deserializer | null | 用来指定消息中 value 所需反序列化操作的反序列化器,无默认值。 |
client.id | null | 这个参数用来设定 KafkaConsumer 对应的客户端 id,默认值也为“”。如果客户端不设置,则 KafkaConsumer 会自动生成一个非空字符串,内容形式如“consumer-1”“consumer-2”,即字符串“consumer-”与数字的拼接。 |
consumer.id | null | 不需要设置,一般自动产生 |
socket.timeout.ms | 30*100 | 网络请求的超时限制。真实的超时限制是 max.fetch.wait+socket.timeout.ms |
socket.receive.buffer.bytes | 64*1024 | socket 用于接收网络请求的缓存大小 |
fetch.message.max.bytes | 1024*1024 | 每次 fetch 请求中,针对每次 fetch 消息的最大字节数。这些字节将会督导用于每个 partition 的内存中,因此,此设置将会控制 consumer 所使用的 memory 大小。这个 fetch 请求尺寸必须至少和 server 允许的最大消息尺寸相等,否则,producer 可能发送的消息尺寸大于 consumer 所能消耗的尺寸。 |
num.consumer.fetchers | 1 | 用于 fetch 数据的 fetcher 线程数 |
auto.commit.enable | true | 如果为真,consumer 所 fetch 的消息的 offset 将会自动的同步到 zookeeper。这项提交的 offset 将在进程挂掉时,由新的 consumer 使用 |
auto.commit.interval.ms | 60*1000 | consumer 向 zookeeper 提交 offset 的频率,单位是秒 |
queued.max.message.chunks | 2 | 用于缓存消息的最大数目,以供 consumption。每个 chunk 必须和 fetch.message.max.bytes 相同 |
rebalance.max.retries | 4 | 当新的 consumer 加入到 consumer group 时,consumers 集合试图重新平衡分配到每个 consumer 的 partitions 数目。如果 consumers 集合改变了,当分配正在执行时,这个重新平衡会失败并重入 |
fetch.min.bytes | 1 | 每次 fetch 请求时,server 应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。 |
fetch.wait.max.ms | 100 | 如果没有足够的数据能够满足 fetch.min.bytes,则此项配置是指在应答 fetch 请求之前,server 会阻塞的最大时间。 |
rebalance.backoff.ms | 2000 | 在重试 reblance 之前 backoff 时间 |
refresh.leader.backoff.ms | 200 | 在试图确定某个 partition 的 leader 是否失去他的 leader 地位之前,需要等待的 backoff 时间 |
auto.offset.reset | largest | zookeeper 中没有初始化的 offset 时,如果 offset 是以下值的回应: smallest:自动复位 offset 为 smallest 的 offset largest:自动复位 offset 为 largest 的 offset anything else:向 consumer 抛出异常 |
consumer.timeout.ms | -1 | 如果没有消息可用,即使等待特定的时间之后也没有,则抛出超时异常 |
exclude.internal.topics | true | 是否将内部 topics 的消息暴露给 consumer |
paritition.assignment.strategy | range | 选择向 consumer 流分配 partitions 的策略,可选值:range,roundrobin |
client.id | group id value | 是用户特定的字符串,用来在每次请求中帮助跟踪调用。它应该可以逻辑上确认产生这个请求的应用 |
zookeeper.session.timeout.ms | 6000 | zookeeper 会话的超时限制。如果 consumer 在这段时间内没有向 zookeeper 发送心跳信息,则它会被认为挂掉了,并且 reblance 将会产生 |
zookeeper.connection.timeout.ms | 6000 | 客户端在建立通 zookeeper 连接中的最大等待时间 |
zookeeper.sync.time.ms | 2000 | ZK follower 可以落后 ZK leader 的最大时间 |
offsets.storage | zookeeper | 用于存放 offsets 的地点: zookeeper 或者 kafka |
offset.channel.backoff.ms | 1000 | 重新连接 offsets channel 或者是重试失败的 offset 的 fetch/commit 请求的 backoff 时间 |
offsets.channel.socket.timeout.ms | 10000 | 当读取 offset 的 fetch/commit 请求回应的 socket 超时限制。此超时限制是被 consumerMetadata 请求用来请求 offset 管理 |
offsets.commit.max.retries | 5 | 重试 offset commit 的次数。这个重试只应用于 offset commits 在 shut-down 之间。他 |
dual.commit.enabled | true | 如果使用“kafka”作为 offsets.storage,你可以二次提交 offset 到 zookeeper(还有一次是提交到 kafka)。在 zookeeper-based 的 offset storage 到 kafka-based 的 offset storage 迁移时,这是必须的。对任意给定的 consumer group 来说,比较安全的建议是当完成迁移之后就关闭这个选项 |
partition.assignment.strategy | range | 在“range”和“roundrobin”策略之间选择一种作为分配 partitions 给 consumer 数据流的策略; 循环的 partition 分配器分配所有可用的 partitions 以及所有可用 consumer 线程。它会将 partition 循环的分配到 consumer 线程上。如果所有 consumer 实例的订阅都是确定的,则 partitions 的划分是确定的分布。循环分配策略只有在以下条件满足时才可以:(1)每个 topic 在每个 consumer 实力上都有同样数量的数据流。(2)订阅的 topic 的集合对于 consumer group 中每个 consumer 实例来说都是确定的。 |
参考
《深入理解 Kafka:核心设计与实践原理》 朱忠华