【DevDojo】Kafka实操 ·一文快速搞懂kafka版本选型和避坑参数配置 您所在的位置:网站首页 faded各种版本 【DevDojo】Kafka实操 ·一文快速搞懂kafka版本选型和避坑参数配置

【DevDojo】Kafka实操 ·一文快速搞懂kafka版本选型和避坑参数配置

2023-11-19 09:02| 来源: 网络整理| 查看: 265

【DevDojo】 @you: “Stay focused and work hard!”

05. 版本号 05.1 如何看懂 Kafka 版本号?

如果你不了解各个版本之间的差异和功能变化,你怎么能够准确地评判某 Kafka 版本是不是满足你的业务需求呢?因此在深入学习 Kafka 之前,花些时间搞明白版本演进,实际上是非常划算的一件事。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xHv4DHcs-1678850793624)(./assets/image-20230314195614124.png)]

前面的版本号是编译 Kafka 源代码的 Scala 编译器版本。

对于 kafka-2.11-2.1.1 的解读,真正的 Kafka 版本号实际上是 2.1.1。那么这个 2.1.1 又表示什么呢?前面的 2 表示大版本号,即 Major Version;中间的 1 表示小版本号或次版本号,即 Minor Version;最后的 1 表示修订版本号,也就是 Patch 号。

Kafka 服务器端的代码完全由 Scala 语言编写,Scala 同时支持面向对象编程和函数式编程,用 Scala 写成的源代码编译之后也是普通的“.class”文件,因此我们说 Scala 是 JVM 系的语言,它的很多设计思想都是为人称道的。

可以认为Kafka版本号从来都是由 3 个部分构成,即“大版本号 - 小版本号 - Patch 号”。这种视角可以统一所有的 Kafka 版本命名

假设碰到的 Kafka 版本是 0.10.2.2,你现在就知道了它的大版本是 0.10,小版本是 2,总共打了两个大的补丁,Patch 号是 2。

05.2 kafka版本演进? 哪些版本引入了哪些重大的功能改进?

关于这个问题,我建议你最好能做到如数家珍,因为这样不仅令你在和别人交谈 Kafka 时显得很酷,而且如果你要向架构师转型或者已然是架构师,那么这些都是能够帮助你进行技术选型、架构评估的重要依据。

Kafka 目前总共演进了 7 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0.

版本功能备注0.7只提供了最基础的消息队列功能,甚至连副本机制都没有不推荐0.8引入副本机制, 成了真正意义上完备的分布式高可靠消息队列解决方案(老版本客户端api)需要需要指定 ZooKeeper 的地址而非 Broker 的地址; 生产者api默认使用同步方式; 推荐最低版本v0.8.2.0, 搭配old client api(此时虽有了New produder api, 但bug还很多)0.9(2015.11)增加了基础的安全认证 / 权限功能,用 Java 重写了新版本消费者 API,还引入了 Kafka Connect 组件用于实现高性能的数据抽取此时V0.9以上new producer api已经算比较稳定了, 线上可以使用了; 但new consumer api还有很多bug, 千万不要用0.10(里程碑)引入Kafka streams, 正式成为分布式流处理平台推荐至少V0.10.2.2再使用new consumer api, 而且修复了一个可能导致producer性能降低的bug0.11(重量级变更)一个是提供幂等性 Producer API 以及事务(Transaction) API;另一个是对 Kafka 消息格式做了重构。Producer 实现幂等性以及支持事务都是 Kafka 实现流处理结果正确性的基石。此时的事务 API 有一些 Bug,不算十分稳定. 如果线上不敢用V1.0, 推荐至少V0.11.0.3(3个补丁版本之后), 此时功能已经非常完善(最主流版本之一)!1.0&2.0Kafka Streams 的各种改进如果你是 Kafka Streams 的用户,至少选择 V2.0.0 版本吧

不论你用的是哪个版本,都请尽量保持服务器端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。

每个 Kafka 版本都有它恰当的使用场景和独特的优缺点,切记不要一味追求最新版本。

学员反例举例:

(1)讲一下我在生产环境中遇到的Kafka版本带来的坑。我参与到项目中一年,运行的版本是0.10.0.1,前半年还算稳定,偶尔出现进程假死问题。但是慢慢的生产环境数据量增加,假死频发,导致客户数据丢失,问题很严重。但是一直又没有证据证明这个版本确实存在问题;

(2)目前用的是hdp2.4.2内嵌版本。应该是apache版本的0.8.2.0。遇到很多问题都很难找到解决方法。比如前几天遇到了replicaFetcherThread oom的问题,网上根本找不到什么正经的解释。–70 QQQQQ 可以查一下ZooKeeper中是否存在大量session超时的情况。不过还是建议升级吧,听着很像是一个已知的bug。如果暂时不能升级,可以尝试调低replica.fetch.max.bytes的值试试。

(3)的确在工作中遇到了kafka版本不同导致消息格式不兼容问题,后来服务端和客户端统一版本号才解决;

07 | 最最最重要的集群参数配置(上)

因为有些配置的重要性并未体现在官方文档中,并且从实际表现看,很多参数对系统的影响要比从文档上看更加明显,因此很有必要集中讨论一下。

07.1 Broker段参数

目前 Kafka Broker 提供了近 200 个参数,这其中绝大部分参数都不用你亲自过问。

参数(这些都是全局参数,针对broker上所有log而言的)含义备注(1) log.dirs非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。只要设置log.dirs,不要设置log.dir。更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。(2) log.dir注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。(01) listeners与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。常见的玩法是:你的Kafka Broker机器上配置了双网卡,一块网卡用于内网访问(即我们常说的内网IP);另一个块用于外网访问。那么你可以配置listeners为内网IP,advertised.listeners为外网IP。(02) advertised.listeners同上, 和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。advertised.listeners主要是为外网访问用的。如果clients在内网环境访问Kafka不需要配置这个参数。(03) host.name/port列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了(001) auto.create.topics.enable是否允许自动创建 Topic建议最好设置成 false,即不允许自动创建 Topic(避免一些稀奇古怪的的Topic被创建, 应该由运维严格把控)(002) unclean.leader.election.enable是否允许 Unclean Leader 选举只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之如果是 true,那么 Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader(异步拉取消息,必然有一个时间窗口导致它和leader中的数据是不一致的,或者说它是落后于leader的)。比较搞笑的是社区对这个参数的默认值来来回回改了好几版了,鉴于我不知道你用的是哪个版本的 Kafka,所以建议你还是显式地把它设置成 false 吧。(003) auto.leader.rebalance.enable是否允许定期进行 Leader 选举设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。(a) log.retention.{hour|minutes|ms}“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。通常情况下我们还是设置 hour 级别的多一些,比如log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使用,那么这个值就要相应地调大。(b) log.retention.bytes这是指定 Broker 为消息保存的总磁盘容量大小log.retention.bytes。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间© message.max.bytes控制 Broker 能够接收的最大消息大小**不能使用默认值的参数, 默认的 900多KB 太少了,还不到 1MB。**实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。

**有条件的话你最好保证这些目录挂载到不同的物理磁盘上。**这样做有两个好处:

提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。

能够实现**故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。**要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。还记得上一期我们关于 Kafka 是否需要使用 RAID 的讨论吗?这个改进正是我们舍弃 RAID 方案的基础:没有这种 Failover 的话,我们只能依靠 RAID 来提供保障。

坏掉的数据是怎么自动转移到其他磁盘上的呢?

\1. Broker自动在好的路径上重建副本,然后从leader同步; \2. Kafka支持工具能够将某个路径上的数据拷贝到其他路径上

从构成上来说,**它是若干个逗号分隔的三元组,每个三元组的格式为。**这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092。

一旦你自己定义了协议名称,你必须还要指定listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示CONTROLLER这个自定义协议底层使用明文不加密传输数据。

有个事情你还是要注意一下,经常有人会问主机名这个设置中我到底使用 IP 地址还是主机名。这里我给出统一的建议:最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。 Broker 源代码中也使用的是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。

定期重新选举,应该有个触发的条件, 跟每台broker的leader数量有关,如果leader分布不均衡就会触发重新选举leader–> 这个比例由broker端参数leader.imbalance.per.broker.percentage控制,默认是10%。举个例子,如果一个broker上有10个分区,有2个分区的leader不是preferred leader,那么就会触发

07.02 Zookeeper参数

首先 ZooKeeper 是做什么的呢?它是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。

参数含义备注zookeeper.connectKafka 与 ZooKeeper 相关的最重要的参数, 也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。

如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。

如果你有两套 Kafka 集群,假设分别叫它们 kafkas1 和 kafkas2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafkas1和zk1:2181,zk2:2181,zk3:2181/kafkas2。

切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。

使用篇| 08 | 最最最重要的集群参数配置(下) 08.1 Topic 级别参数

Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值,这就是所谓的 Topic 级别参数。Topic 级别参数的有两种方式可以设置:

创建 Topic 时进行设置修改 Topic 时设置 参数(满足任何一个就会开始删除消息)含义备注retention.ms规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。retention.bytes规定了要为该 Topic 预留多大的磁盘空间。通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

先来看看如何在创建 Topic 时设置这些参数, 设想你的部门需要将交易数据发送到 Kafka 进行处理,需要保存最近半年的交易数据,同时这些数据很大,通常都有几 MB,但一般不会超过 5MB。

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create--topictransaction--partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

再来看看方式二:

个人的建议是,你最好始终坚持使用第二种方式来设置,并且在未来,Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-typetopics --entity-nametransaction --alter --add-config max.message.bytes=10485760

修改 Topic 级 max.message.bytes,还要考虑以下两个吧? 还要修改 Broker的 replica.fetch.max.bytes 保证复制正常 消费还要修改配置 fetch.message.max.bytes

08.2 JVM参数

Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。

Kafka 自 2.0.0 版本开始,已经正式摒弃对 Java 7 的支持了,所以有条件的话至少使用 Java 8 吧。

参数含义备注Heap Size默认的 Heap Size 来跑 Kafka,说实话默认的 1GB 有点小,毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例无脑给出一个通用的建议:将你的 JVM 堆大小设置成 6GB 吧,这是目前业界比较公认的一个合理值。比如最近环境中有一台3G堆内存的节点在某个topic handle request的时候一直OOM,调整到5G重启后恢复正常, 评判堆内存大小没有通用标准, 最好还是监控一下实时的堆大小,特别是GC之后的live data大小,通常将heapsize设置成其1.5~2倍就足以了垃圾回收器也就是GC 设置a. 如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器: 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。b. 如果你已经在使用 Java 8 了,那么就用 G1 收集器就好了(G1是jdk9中默认的,jdk8还是需要显式指定的)。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等

设置的方法也很简单,你只需要设置下面这两个环境变量即可:

KAFKA_HEAP_OPTS:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。

比如你可以这样启动 Kafka Broker,即在启动 Kafka Broker 之前,先设置上这两个环境变量:

$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g $> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true $> bin/kafka-server-start.sh config/server.properties 08.3 os参数

通常情况下,Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个:

文件描述符限制:

首先是ulimit -n。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。

《Kafka权威指南》上说Kafka单机可以轻松处理300万并发;《响应式架构:消息模式Actor实现与Scala、Akka应用集成》上说Scala用Actor单机可以处理5000万并发。如果不设置,单机在Centos7上几百的并发就报“Too many open files”了。网上搜索后设置成65535,用JMater压测单机也只能支撑到1000左右的并发!

还记得电影《让子弹飞》里的对话吗:“你和钱,谁对我更重要?都不重要,没有你对我很重要!”。这个参数也有点这么个意思。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。

文件系统类型

根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。对了,最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。

Swappiness

网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。

提交时间

Flush 落盘时间。向 Kafka 发送数据只要数据被写入到操作系统的页缓存(Page Cache)上就可以了认为成功了, 并不是真要等数据被写入磁盘,**随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。**当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,

至少还有其他正常的副本可以使用。。。这个副本重启回来后会重新加载日志段,获取到当前末端位移,因此也能感知刚才未成功写入的消息并重新拉取之~~

因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

注意这个不算内核参数,是文件系统的参数。你可以查询一下文件系统手册。比如ext4就是commit=Nseconds这样设置

08.4 很多人争论 Kafka 不需要为 Broker 设置太大的堆内存,而应该尽可能地把内存留给页缓存使用。对此你是怎么看的?

在你的实际使用中有哪些好的法则来评估 Kafka 对内存的使用呢?

答: 虽然无脑推荐6GB,但绝不是无脑推荐>6GB。所以我觉得6GB可以是一个初始值,你可以实时监控堆上的live data大小,根据这个值调整heap size。

另外堆越小留给页缓存的空间也就越大,这对Kafka是好事啊。

jvm的heap推荐6G,如果我整个机器的只有8G,是否50%-50%的分配比较合适?留一半给OS?

答: 如果你只有8GB,就不要设置6GB了,酌情调小吧。具体设置方法可以监控堆上的live data,然后大约乘以1.5或2即可。比如你可以手动触发Full GC,然后查看一下堆上存活的数据大小,比如说是1500MB,那么你可以设置heap size为2.25GB。

在我们搭建的时候,尽量机器配置相同?

答: 没有说一定要配置相同,有条件的话可以保持配置一样,至少方便运维。

kafka streams或者ksql的性能参数调优有什么建议和参考资料吗?

Kafka Streams的性能调优建议:https://www.confluent.io/blog/optimizing-kafka-streams-applications

我们生产环境有个kafka的存储周期因为磁盘大小原因没设置多大,所以有存在kafka在删除数据的时候,这些数据可能正在消费,程序挂死问题,日志显示kafka协调者死掉了,或者有的直接一台broker挂了?

答: \1. 删除的时候不会顾及consumer的。可能的问题就是位移越界导致的位移重置,比如consumer位移发生跳跃的情形 \2. 是Coordinator挂掉还是所在broker挂掉?或者说后者挂掉也不一定就是Coordinator组件故障导致的吧。最好还是给出一些详细信息,否则不太好评估。

胡夕大佬课程的个人笔记摘录



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有