• 1.kafka常用脚本管理之topic管理
1.查看现有的topics列表
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list

[root@elk91 ~]#
2.创建指定分区副本的topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --create --partitions 3 --replication-factor 2 --topic xixi
Created topic xixi.
[root@elk91 ~]# 
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
xixi
[root@elk91 ~]# 
3.查看分区的详细信息
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --describe
Topic: xixi	TopicId: BjioKLaiQ1-DqAcOwgCcZw	PartitionCount: 3	ReplicationFactor: 2	Configs: 
	Topic: xixi	Partition: 0	Leader: 93	Replicas: 93,91	Isr: 93,91	Elr: N/A	LastKnownElr: N/A
	Topic: xixi	Partition: 1	Leader: 91	Replicas: 91,92	Isr: 91,92	Elr: N/A	LastKnownElr: N/A
	Topic: xixi	Partition: 2	Leader: 92	Replicas: 92,93	Isr: 92,93	Elr: N/A	LastKnownElr: N/A
[root@elk91 ~]# 
4.修改分区数【只能调大,不能调小】
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --alter --partitions 5
[root@elk91 ~]# 
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --describe
Topic: xixi	TopicId: BjioKLaiQ1-DqAcOwgCcZw	PartitionCount: 5	ReplicationFactor: 2	Configs: 
	Topic: xixi	Partition: 0	Leader: 93	Replicas: 93,91	Isr: 93,91	Elr: N/A	LastKnownElr: N/A
	Topic: xixi	Partition: 1	Leader: 91	Replicas: 91,92	Isr: 91,92	Elr: N/A	LastKnownElr: N/A
	Topic: xixi	Partition: 2	Leader: 92	Replicas: 92,93	Isr: 92,93	Elr: N/A	LastKnownElr: N/A
	Topic: xixi	Partition: 3	Leader: 93	Replicas: 93,92	Isr: 93,92	Elr: N/A	LastKnownElr: N/A
	Topic: xixi	Partition: 4	Leader: 91	Replicas: 91,93	Isr: 91,93	Elr: N/A	LastKnownElr: N/A

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --alter --partitions 3
Error while executing topic command : Topic currently has 5 partitions, which is higher than the requested 3.     #不能调小会报错
[2025-03-17 11:05:14,673] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 5 partitions, which is higher than the requested 3.
 (org.apache.kafka.tools.TopicCommand)
[root@elk91 ~]# 
5.删除topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic xixi --delete
[root@elk91 ~]# 
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list

[root@elk91 ~]# 
  • kafka常用脚本管理之producer和consumer
1.创建生产者程序【首次写入数据时,若topic不存在,则默认会自动创建】
[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic haha 
>www.baidu.com
[2025-03-17 11:09:14,163] WARN [Producer clientId=console-producer] The metadata response from the cluster reported a recoverable issue with correlation id 7 : {lhaha=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>nihao
>
2.查看现有的topics列表
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
haha
[root@elk91 ~]# 
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --describe --topic haha
Topic: haha	TopicId: ucvPuxy8Sy6hhDU5M3mffQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: haha	Partition: 0	Leader: 91	Replicas: 91	Isr: 91	Elr: N/A	LastKnownElr: N/A
[root@elk91 ~]# 
3.创建消费者程序【创建消费者后,再次在生产者终端写入数据'你好啊~'】
root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic haha   # 默认从最新的offset获取数据。
你好~

	
[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.92:9092 --topic haha --from-beginning  # 从最老的offset获取数据
www.baidu.com
你好~
你好啊~
4.创建消费者后,会自动生成一个新的topic,名为'__consumer_offsets',里面存储的是offset信息
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
__consumer_offsets
haha
[root@elk91 ~]#

  • kafka常用脚本管理之consumer group
  • 1.常用概念
consumer group 
       消费者组,每个消费者都隶属于某个消费者组。  换句话说,一个消费者组可以有一个或多个消费者。
offset:
       用于标识每个parition数据的位置点,每个paritition的数据都是顺序的,我们将每个paritition最后一个偏移量,称为LEO(Log End Offset)。
rebalance:
       重平衡,指的是同一个消费者组的消费者重新分配partition的过程。
       当一个消费者组的消费者数量发生变化或者partition数量发生变化时,都会触发rebalance。
  • 2.消费者组案例
2.1 创建topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --create --partitions 3 --replication-factor 2 --topic haha
[root@elk91 ~]#
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --describe --topic haha
Topic: haha TopicId: n7X_VtnBR32YLgE2AKMCaA PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: haha Partition: 0 Leader: 91 Replicas: 91,92 Isr: 91,92 Elr: N/A LastKnownElr: N/A
Topic: haha Partition: 1 Leader: 92 Replicas: 92,93 Isr: 92,93 Elr: N/A LastKnownElr: N/A
Topic: haha Partition: 2 Leader: 93 Replicas: 93,91 Isr: 93,91 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]#

2.2 创建生产者写入测试数据
[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic haha
>abc
>


2.3 创建消费者并指定消费者组
[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic oldboyedu-linux96 --consumer-property group.id=eat --from-beginning
abc


2.4 消费者组的基本管理
2.4.1 查看消费者组列表
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.92:9092 --list
console-consumer-44473
linux96
console-consumer-68203
eat
[root@elk91 ~]#

2.4.2 查看指定消费者组的详细信息
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.92:9092 --group eat --describe ;echo

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
eat haha 1 0 0 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
eat haha 2 0 0 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
eat haha 0 1 1 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
[root@elk91 ~]#


2.4.3 再次启动新的消费者并指的同一个消费者组
[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.92:9092 --topic haha --consumer-property group.id=eat --from-beginning



2.4.4 再次查看指定消费者组的详细信息
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.92:9092 --group eat --describe ;echo

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
eat haha 1 0 0 0 console-consumer-0e7aa638-7bed-4e56-967d-6286bcbdc8c3 /10.0.0.93 console-consumer
eat haha 0 5 5 0 console-consumer-0e7aa638-7bed-4e56-967d-6286bcbdc8c3 /10.0.0.93 console-consumer
eat haha 2 0 0 0 console-consumer-7706a45c-91a8-492d-9263-b822eb054286 /10.0.0.92 console-consumer
[root@elk91 ~]#



  • 3.总结
	
一个消费者组可以有多个消费者,消费者的数量可以多余topic的分区数,但是多余的消费者并不会消费分区的数据。说白了有空闲的消费者是不干活的。

当该消费者组的消费者有退出时,此时空闲的消费者才能"上位"消费分区的数据。

kafka的JVM调优

  • 1.调优思路
生产环境中建议kafka的JVM的堆内存设置为6GB即可。测试环境的话256m足以
参考配置: 【JDK8,JDK11,JDK17,JDK21】
KAFKA_OPTS="-Xmx6g -Xms6g -XX:MetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50
 -XX:MaxMetaspaceFreeRatio=85"
  • 2.2 修改堆内存大小
[root@elk91 ~]# vim /usr/local/kafka_2.13-3.9.0/bin/kafka-server-start.sh 
...
 export KAFKA_HEAP_OPTS="-Xmx256m -Xms256m"
...
[root@elk91 ~]# 
[root@elk91 ~]# scp /usr/local/kafka_2.13-3.9.0/bin/kafka-server-start.sh  10.0.0.92:/usr/local/kafka_2.13-3.9.0/bin/
[root@elk91 ~]# scp /usr/local/kafka_2.13-3.9.0/bin/kafka-server-start.sh  10.0.0.93:/usr/local/kafka_2.13-3.9.0/bin/
  • 2.3 停止kafka
[root@elk91 ~]# kafka-server-stop.sh 
[root@elk92 ~]# kafka-server-stop.sh 
[root@elk93 ~]# kafka-server-stop.sh 
  • 2.4 启动kafka
[root@elk91 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
[root@elk92 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
[root@elk93 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
  • 2.5 再次验证JVM是否调优成功
[root@elk91 ~]# free -h
               total        used        free      shared  buff/cache   available
Mem:           7.7Gi       2.4Gi       1.7Gi       1.0Mi       3.6Gi       5.0Gi
Swap:          3.8Gi          0B       3.8Gi
[root@elk91 ~]# 
[root@elk91 ~]# ps -ef | grep kafka | grep Xms
root       51593       1  4 14:43 pts/0    00:00:05 /usr/share/elasticsearch/jdk/bin/java -Xmx256m -Xms256m ...

kafka集群的压测及官方文档阅读技巧

参考连接:
https://www.cnblogs.com/yinzhengjie/p/9953212.html
https://kafka.apache.org/documentation/#tiered_storage_config_ex

新版本测试需要参考官网:
比如:
kafka-producer-perf-test.sh --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092