- 1.kafka常用脚本管理之topic管理
1.查看现有的topics列表
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
[root@elk91 ~]#
2.创建指定分区副本的topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --create --partitions 3 --replication-factor 2 --topic xixi
Created topic xixi.
[root@elk91 ~]#
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
xixi
[root@elk91 ~]#
3.查看分区的详细信息
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --describe
Topic: xixi TopicId: BjioKLaiQ1-DqAcOwgCcZw PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: xixi Partition: 0 Leader: 93 Replicas: 93,91 Isr: 93,91 Elr: N/A LastKnownElr: N/A
Topic: xixi Partition: 1 Leader: 91 Replicas: 91,92 Isr: 91,92 Elr: N/A LastKnownElr: N/A
Topic: xixi Partition: 2 Leader: 92 Replicas: 92,93 Isr: 92,93 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]#
4.修改分区数【只能调大,不能调小】
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --alter --partitions 5
[root@elk91 ~]#
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --describe
Topic: xixi TopicId: BjioKLaiQ1-DqAcOwgCcZw PartitionCount: 5 ReplicationFactor: 2 Configs:
Topic: xixi Partition: 0 Leader: 93 Replicas: 93,91 Isr: 93,91 Elr: N/A LastKnownElr: N/A
Topic: xixi Partition: 1 Leader: 91 Replicas: 91,92 Isr: 91,92 Elr: N/A LastKnownElr: N/A
Topic: xixi Partition: 2 Leader: 92 Replicas: 92,93 Isr: 92,93 Elr: N/A LastKnownElr: N/A
Topic: xixi Partition: 3 Leader: 93 Replicas: 93,92 Isr: 93,92 Elr: N/A LastKnownElr: N/A
Topic: xixi Partition: 4 Leader: 91 Replicas: 91,93 Isr: 91,93 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --alter --partitions 3
Error while executing topic command : Topic currently has 5 partitions, which is higher than the requested 3. #不能调小会报错
[2025-03-17 11:05:14,673] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 5 partitions, which is higher than the requested 3.
(org.apache.kafka.tools.TopicCommand)
[root@elk91 ~]#
5.删除topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --delete
[root@elk91 ~]#
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
[root@elk91 ~]#
- kafka常用脚本管理之producer和consumer
1.创建生产者程序【首次写入数据时,若topic不存在,则默认会自动创建】
[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic haha
>www.baidu.com
[2025-03-17 11:09:14,163] WARN [Producer clientId=console-producer] The metadata response from the cluster reported a recoverable issue with correlation id 7 : {lhaha=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>nihao
>
2.查看现有的topics列表
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
haha
[root@elk91 ~]#
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --describe --topic haha
Topic: haha TopicId: ucvPuxy8Sy6hhDU5M3mffQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: haha Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]#
3.创建消费者程序【创建消费者后,再次在生产者终端写入数据'你好啊~'】
root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic haha # 默认从最新的offset获取数据。
你好~
[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.92:9092 --topic haha --from-beginning # 从最老的offset获取数据
www.baidu.com
你好~
你好啊~
4.创建消费者后,会自动生成一个新的topic,名为'__consumer_offsets',里面存储的是offset信息
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
__consumer_offsets
haha
[root@elk91 ~]#
- kafka常用脚本管理之consumer group
- 1.常用概念
consumer group
消费者组,每个消费者都隶属于某个消费者组。 换句话说,一个消费者组可以有一个或多个消费者。
offset:
用于标识每个parition数据的位置点,每个paritition的数据都是顺序的,我们将每个paritition最后一个偏移量,称为LEO(Log End Offset)。
rebalance:
重平衡,指的是同一个消费者组的消费者重新分配partition的过程。
当一个消费者组的消费者数量发生变化或者partition数量发生变化时,都会触发rebalance。
- 2.消费者组案例
2.1 创建topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --create --partitions 3 --replication-factor 2 --topic haha
[root@elk91 ~]#
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --describe --topic haha
Topic: haha TopicId: n7X_VtnBR32YLgE2AKMCaA PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: haha Partition: 0 Leader: 91 Replicas: 91,92 Isr: 91,92 Elr: N/A LastKnownElr: N/A
Topic: haha Partition: 1 Leader: 92 Replicas: 92,93 Isr: 92,93 Elr: N/A LastKnownElr: N/A
Topic: haha Partition: 2 Leader: 93 Replicas: 93,91 Isr: 93,91 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]#
2.2 创建生产者写入测试数据
[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic haha
>abc
>
2.3 创建消费者并指定消费者组
[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic oldboyedu-linux96 --consumer-property group.id=eat --from-beginning
abc
2.4 消费者组的基本管理
2.4.1 查看消费者组列表
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.92:9092 --list
console-consumer-44473
linux96
console-consumer-68203
eat
[root@elk91 ~]#
2.4.2 查看指定消费者组的详细信息
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.92:9092 --group eat --describe ;echo
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
eat haha 1 0 0 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
eat haha 2 0 0 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
eat haha 0 1 1 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
[root@elk91 ~]#
2.4.3 再次启动新的消费者并指的同一个消费者组
[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.92:9092 --topic haha --consumer-property group.id=eat --from-beginning
2.4.4 再次查看指定消费者组的详细信息
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.92:9092 --group eat --describe ;echo
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
eat haha 1 0 0 0 console-consumer-0e7aa638-7bed-4e56-967d-6286bcbdc8c3 /10.0.0.93 console-consumer
eat haha 0 5 5 0 console-consumer-0e7aa638-7bed-4e56-967d-6286bcbdc8c3 /10.0.0.93 console-consumer
eat haha 2 0 0 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
[root@elk91 ~]#
3.总结
一个消费者组可以有多个消费者,消费者的数量可以多余topic的分区数,但是多余的消费者并不会消费分区的数据。说白了有空闲的消费者是不干活的。
当该消费者组的消费者有退出时,此时空闲的消费者才能"上位"消费分区的数据。
kafka的JVM调优
- 1.调优思路
生产环境中建议kafka的JVM的堆内存设置为6GB即可。测试环境的话256m足以
参考配置: 【JDK8,JDK11,JDK17,JDK21】
KAFKA_OPTS="-Xmx6g -Xms6g -XX:MetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50
-XX:MaxMetaspaceFreeRatio=85"
- 2.2 修改堆内存大小
[root@elk91 ~]# vim /usr/local/kafka_2.13-3.9.0/bin/kafka-server-start.sh
...
export KAFKA_HEAP_OPTS="-Xmx256m -Xms256m"
...
[root@elk91 ~]#
[root@elk91 ~]# scp /usr/local/kafka_2.13-3.9.0/bin/kafka-server-start.sh 10.0.0.92:/usr/local/kafka_2.13-3.9.0/bin/
[root@elk91 ~]# scp /usr/local/kafka_2.13-3.9.0/bin/kafka-server-start.sh 10.0.0.93:/usr/local/kafka_2.13-3.9.0/bin/
- 2.3 停止kafka
[root@elk91 ~]# kafka-server-stop.sh
[root@elk92 ~]# kafka-server-stop.sh
[root@elk93 ~]# kafka-server-stop.sh
- 2.4 启动kafka
[root@elk91 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@elk92 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@elk93 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
- 2.5 再次验证JVM是否调优成功
[root@elk91 ~]# free -h
total used free shared buff/cache available
Mem: 7.7Gi 2.4Gi 1.7Gi 1.0Mi 3.6Gi 5.0Gi
Swap: 3.8Gi 0B 3.8Gi
[root@elk91 ~]#
[root@elk91 ~]# ps -ef | grep kafka | grep Xms
root 51593 1 4 14:43 pts/0 00:00:05 /usr/share/elasticsearch/jdk/bin/java -Xmx256m -Xms256m ...
kafka集群的压测及官方文档阅读技巧
参考连接:
https://www.cnblogs.com/yinzhengjie/p/9953212.html
https://kafka.apache.org/documentation/#tiered_storage_config_ex
新版本测试需要参考官网:
比如:
kafka-producer-perf-test.sh --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
Categories: