Kafka快速入门4-Kafka基本命令行操作
Kafka快速入门4-Kafka基本命令行操作.md
Kafka 提供了很多的命令行工具。在现在这个版本中,bin 目录下大概有 39 个脚本,掌握一定的脚本使用是有必要的。
很多脚本我也没有使用过,不过从 Kafka 官网文档里大概可以归纳出这些脚本的大概用途。
连接器相关:
- connect-distributed.sh: 分布式模式启动 Kafka Connect。
- connect-standalone.sh: 独立模式启动 Kafka Connect 。
- connect-mirror-maker.sh: 用于跨集群数据复制。
Kafka 核心操作:
- kafka-server-start.sh: 启动 Kafka 服务器。
- kafka-server-stop.sh: 停止 Kafka 服务器。
- kafka-topics.sh: 管理 Kafka 主题,创建、删除、修改等。
- kafka-console-producer.sh: 启动命令行生产者,用于发送消息。
- kafka-console-consumer.sh: 启动命令行消费者,用于接收消息。
集群管理:
- kafka-cluster.sh: 管理 Kafka 集群元数据。
- kafka-acls.sh: 管理 Kafka 权限控制。
- kafka-configs.sh: 管理主题和代理配置。
- kafka-leader-election.sh: 手动触发分区领导者选举。
- kafka-reassign-partitions.sh: 重新分配分区。
监控和调试:
- kafka-consumer-groups.sh: 管理和描述消费者组。
- kafka-dump-log.sh: 转储日志段内容。
- kafka-get-offsets.sh: 获取主题分区的当前偏移量。
- kafka-log-dirs.sh: 查询日志目录使用情况。
性能测试:
- kafka-producer-perf-test.sh: 生产者性能测试工具。
- kafka-consumer-perf-test.sh: 消费者性能测试工具。
元数据管理:
- kafka-metadata-shell.sh: Kafka 元数据 shell。
- kafka-metadata-quorum.sh: 管理 Kafka 的元数据仲裁。
ZooKeeper 相关:
- zookeeper-server-start.sh: 启动 ZooKeeper 服务。
- zookeeper-server-stop.sh: 停止 ZooKeeper 服务。
- zookeeper-shell.sh: ZooKeeper 命令行客户端。
作为研发,掌握一部分必要的命令行工具用法即可。
1. Topic 管理
Topic 管理主要是使用 kafka-topics.sh 脚本。
1.1 创建 Topic
./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2
上面的脚本是创建一个新的topic,具有3个分区和2个副本。分区跟副本之前说过,分区可以提高并行处理能力,副本可以提升数据可靠性。
脚本的具体意思:
- –create:指示要创建一个新的topic 。
- –topic my-topic:指定topic的名称为”my-topic” 。
- –bootstrap-server localhost:9092:指定Kafka broker的地址 。
- –partitions 3:设置topic的分区数为3 。
- –replication-factor 2:设置副本因子为2,即每个分区有2个副本 。 需要注意下,当副本因子为 2 时,数据实际上会被备份 2 份。原始数据 1 份,额外的副本 1 份,这对存储空间会造成一定的影响。同时需要注意的是副本的数量需要小于或等于 broker 的数量。
如果都使用默认配置的话可以简写:
./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092
容易出现的错误:
- Replication factor: 2 larger than available brokers: 1
如果在单机环境下,你直接执行上面的创建一个副本为 2 的 topic ,但是你的 Kafka 集群中只有 1 个可用的 broker,就会出现这个错误。修改为一个副本就可以正常工作:
./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
上面提到过,副本的数量,即复制因子这个参数,需要小于或等于 broker 的数量。Kafka 不允许在同一个 broker 上存储同一个分区的多个副本,因为这样做不能提高可用性和容错性。这样是防止一个 broker 失败时,所有的副本都丢失的情况。
理解副本目的是在不同的物理机器上保存数据副本,以防止单点故障。在实际生产环境中,复制因子设置为 3 是一个比较好的平衡点,可用性较好和性能又还行。
- Topic ‘my-topic’ already exists
这个错误很好理解,就是 topic 已经存在了。
- Topic name “xxxxxx” is illegal
topic 的名称不合法,只能包括 ASCII 字母、 【.】、 【_】 、【-】这几个符号。
- The partitions must be greater than 0
这个错误是创建 topic 的时候分区数为负数或零,不合法的配置导致的,分区数至少为 1。
1.2 查看所有 Topic
下面的命令会输出 topic 的名称:
./kafka-topics.sh --list --bootstrap-server localhost:9092
比如输出结果如下:
my-topic
1.3 查看指定 Topic 详情
./kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
上面的命令是显示指定topic的详细信息,包括分区数、副本分配等。输出结果示例如下:
Topic: my-topic // topic 名称
TopicId: ctlM72H8Td6NwPocmA5BQQ // topic 唯一ID
PartitionCount: 3 // 分区数
ReplicationFactor: 1 // 副本因子
Configs:
Topic: my-topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: my-topic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: my-topic Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
上面所有分区的 Leader、Replicas 和 Isr 都是 1001 ,因为集群中只有一个 broker,其 ID 为 1001。所有分区都位于这个唯一的 broker 上。
如果要查看所有topic的详情,可以省略 –topic 参数:
./kafka-topics.sh --describe --bootstrap-server localhost:9092
1.4 删除 Topic
注意:下面的命令是删除指定的topic及其所有数据。
./kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
- –delete:指示要删除一个topic
- –topic my-topic:指定要删除的topic名称
1.5 修改 topic
1.5.1 增加分区数
./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 6
增加 topic 分区数是相对安全的,因为不会重新分配现有数据,新分区没有历史数据。
需要注意的是可能影响基于 key 的分区策略,同时可能需要消费者重新平衡。
1.5.2 减少分区数
Kafka 不允许直接减少分区数,因为这可能导致数据丢失。
只能创建新主题,指定所需的较少分区数,然后把数据进行迁移。
1.5.3 增加副本数
不建议操作!可以增加副本数,但是较复杂,通常建议创建的时候就确认好副本数。
1.5.4 减少副本数
不建议操作!可以减少副本数,但是较复杂,通常建议创建的时候就确认好副本数。
2. 生产者以及消费者
生产者跟消费者脚本通常都是用于测试以及调试,生产环境下正常是不会进行使用的。
2.1 生产消息
下面的命令是启动一个命令行生产者,允许用户从命令行输入消息并发送到指定的 topic。
./kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
- –topic my-topic:指定要发送消息的topic
- –bootstrap-server localhost:9092:指定Kafka broker的地址
2.1.1 从某个文件中读取消息发送到队列
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic < kafka-messages.txt
可以用来批量导入历史数据、重放日志文件、测试等。上面的 kafka-messages.txt 将文件内容作为输入重定向到生产者,文件中的每一行都会被当作一条单独的消息发送。
2.1.2 指定消息 key
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=:"
这样可以发送带有 key 的消息,这对于保证消息顺序或分区策略很重要。 【parse.key=true】 告诉生产者消息包含 key , 【key.separator=: 】指定键和值之间的分隔符为冒号。
2.1.3 发送带时间戳的消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=," --property "timestamp.field.name=timestamp"
上面的配置允许发送带有自定义时间戳的消息。在一些重放历史数据时但是保留原始时间戳的场景下会使用到。【timestamp.field.name=timestamp】 指定了包含时间戳的字段名。
2.1.4 批量发送消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --batch-size 1000 --timeout 1000
使用这种方式可以提高大量消息发送时的效率。在做性能测试或者大规模数据导入时可以使用到,不过更多的是在程序代码中使用这种批量思想。
【batch-size】指定每批发送的消息数量,【timeout】指定批处理的超时时间,单位是毫秒。
2.1.5 发送消息到指定分区
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --property "partition=0"
将消息发送到特定的分区,测试特定分区的消费者逻辑或者是手动管理消息分布的位置。
2.2 消费消息
./kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning
- –topic my-topic:指定要消费的topic
- –from-beginning:从topic的最早消息开始消费
如果只想消费最新的消息,可以省略–from-beginning:
./kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092
有一些比较常用的配置如下:
- –property print.key=true –property key.separator=: 显示消息的 key 和 value
- –max-messages 10 限制消费的消息数量
- –property print.timestamp=true 显示消息的时间戳
- –partition 0 从特定分区消费
- –property print.offset=true 显示消费的偏移量信息
3. 消费者组管理
3.1 列出所有消费者组
./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
3.2 查看消费组详情
下面的命令显示指定消费者组的详细信息,包括每个分区的消费偏移量、滞后等。
./kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092
- –describe:指示要查看详细信息
- –group my-group:指定要查看的消费者组名称
输出示例如下:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
---------------------- --------- ---------- --------------- --------------- ---- --------------------------------------------------------- ----------- ----------------
console-consumer-12077 my-topic 0 - 2 - console-consumer-fe6945df-3731-4a11-94af-34d84216ce5f /127.0.0.1 console-consumer
console-consumer-12077 my-topic 1 - 0 - console-consumer-fe6945df-3731-4a11-94af-34d84216ce5f /127.0.0.1 console-consumer
console-consumer-12077 my-topic 2 - 0 - console-consumer-fe6945df-3731-4a11-94af-34d84216ce5f /127.0.0.1 console-consumer
从左到右依次的列意思:
- GROUP: 消费者组的名称。
- TOPIC: 被消费的 topic 名称。
- PARTITION: topic 的分区号。 这里有 1、 2、 3 分区。
- CURRENT-OFFSET: 当前消费者组在该分区的消费位置。 如果是显示 - 则表示还没有开始消费。
- LOG-END-OFFSET: 该分区的最新消息偏移量。分区0有2条消息,分区1和2都是0 代表两个分区目前没有消息。
- LAG: 消费滞后量,即还未被消费的消息数量。 显示为 - 则表示无法计算滞后量。
- CONSUMER-ID: 消费者的唯一标识符。
- HOST: 消费者所在的主机。
- CLIENT-ID: 消费者客户端的ID。
3.3 重置消费者组偏移量
下面的命令将指定消费者组在指定topic上的消费偏移量重置到最早的位置。
./kafka-consumer-groups.sh --group my-group --topic my-topic --reset-offsets --to-earliest --execute --bootstrap-server localhost:9092
- –reset-offsets:指示要重置偏移量
- –to-earliest:将偏移量重置到最早的位置
- –execute:执行重置操作,不加此参数只会显示计划而不执行。
3.4 将消费者组的偏移量向前或向后移动指定数量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --shift-by -10 --execute --topic my-topic
3.5 重置偏移量到特定时间点
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-datetime 2022-06-01T00:00:00.000 --execute --topic my-topic
3.6 删除消费者组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group
3.7 显示消费者组的状态
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
3.8 显示消费者组的成员
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
3.9 显示消费者组的偏移量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --offsets
3.10 重置所有主题的偏移量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --all-topics
3.11 导出消费者组的偏移量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe --offsets > group_offsets.txt
注意:研发在生产环境务必不要乱使用这些命令!
4. 其他
4.1 检查 kafka 集群状态
./kafka-broker-api-versions.sh --bootstrap-server localhost:9092