七的博客

Kafka快速入门4-Kafka基本命令行操作

消息队列

Kafka快速入门4-Kafka基本命令行操作.md

Kafka 提供了很多的命令行工具。在现在这个版本中,bin 目录下大概有 39 个脚本,掌握一定的脚本使用是有必要的。

很多脚本我也没有使用过,不过从 Kafka 官网文档里大概可以归纳出这些脚本的大概用途。

连接器相关:

  • connect-distributed.sh: 分布式模式启动 Kafka Connect。
  • connect-standalone.sh: 独立模式启动 Kafka Connect 。
  • connect-mirror-maker.sh: 用于跨集群数据复制。

Kafka 核心操作:

  • kafka-server-start.sh: 启动 Kafka 服务器。
  • kafka-server-stop.sh: 停止 Kafka 服务器。
  • kafka-topics.sh: 管理 Kafka 主题,创建、删除、修改等。
  • kafka-console-producer.sh: 启动命令行生产者,用于发送消息。
  • kafka-console-consumer.sh: 启动命令行消费者,用于接收消息。

集群管理:

  • kafka-cluster.sh: 管理 Kafka 集群元数据。
  • kafka-acls.sh: 管理 Kafka 权限控制。
  • kafka-configs.sh: 管理主题和代理配置。
  • kafka-leader-election.sh: 手动触发分区领导者选举。
  • kafka-reassign-partitions.sh: 重新分配分区。

监控和调试:

  • kafka-consumer-groups.sh: 管理和描述消费者组。
  • kafka-dump-log.sh: 转储日志段内容。
  • kafka-get-offsets.sh: 获取主题分区的当前偏移量。
  • kafka-log-dirs.sh: 查询日志目录使用情况。

性能测试:

  • kafka-producer-perf-test.sh: 生产者性能测试工具。
  • kafka-consumer-perf-test.sh: 消费者性能测试工具。

元数据管理:

  • kafka-metadata-shell.sh: Kafka 元数据 shell。
  • kafka-metadata-quorum.sh: 管理 Kafka 的元数据仲裁。

ZooKeeper 相关:

  • zookeeper-server-start.sh: 启动 ZooKeeper 服务。
  • zookeeper-server-stop.sh: 停止 ZooKeeper 服务。
  • zookeeper-shell.sh: ZooKeeper 命令行客户端。

作为研发,掌握一部分必要的命令行工具用法即可。

1. Topic 管理

Topic 管理主要是使用 kafka-topics.sh 脚本。

1.1 创建 Topic

./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2

上面的脚本是创建一个新的topic,具有3个分区和2个副本。分区跟副本之前说过,分区可以提高并行处理能力,副本可以提升数据可靠性。

脚本的具体意思:

  • –create:指示要创建一个新的topic 。
  • –topic my-topic:指定topic的名称为”my-topic” 。
  • –bootstrap-server localhost:9092:指定Kafka broker的地址 。
  • –partitions 3:设置topic的分区数为3 。
  • –replication-factor 2:设置副本因子为2,即每个分区有2个副本 。 需要注意下,当副本因子为 2 时,数据实际上会被备份 2 份。原始数据 1 份,额外的副本 1 份,这对存储空间会造成一定的影响。同时需要注意的是副本的数量需要小于或等于 broker 的数量。

如果都使用默认配置的话可以简写:

./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092

容易出现的错误:

  • Replication factor: 2 larger than available brokers: 1

如果在单机环境下,你直接执行上面的创建一个副本为 2 的 topic ,但是你的 Kafka 集群中只有 1 个可用的 broker,就会出现这个错误。修改为一个副本就可以正常工作:

  ./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

上面提到过,副本的数量,即复制因子这个参数,需要小于或等于 broker 的数量。Kafka 不允许在同一个 broker 上存储同一个分区的多个副本,因为这样做不能提高可用性和容错性。这样是防止一个 broker 失败时,所有的副本都丢失的情况。

理解副本目的是在不同的物理机器上保存数据副本,以防止单点故障。在实际生产环境中,复制因子设置为 3 是一个比较好的平衡点,可用性较好和性能又还行。

  • Topic ‘my-topic’ already exists

这个错误很好理解,就是 topic 已经存在了。

  • Topic name “xxxxxx” is illegal

topic 的名称不合法,只能包括 ASCII 字母、 【.】、 【_】 、【-】这几个符号。

  • The partitions must be greater than 0

这个错误是创建 topic 的时候分区数为负数或零,不合法的配置导致的,分区数至少为 1。

1.2 查看所有 Topic

下面的命令会输出 topic 的名称:

./kafka-topics.sh --list --bootstrap-server localhost:9092

比如输出结果如下:

my-topic

1.3 查看指定 Topic 详情

./kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

上面的命令是显示指定topic的详细信息,包括分区数、副本分配等。输出结果示例如下:

Topic: my-topic   // topic 名称
TopicId: ctlM72H8Td6NwPocmA5BQQ  // topic 唯一ID 
PartitionCount: 3   // 分区数
ReplicationFactor: 1   // 副本因子
Configs:
        Topic: my-topic Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: my-topic Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: my-topic Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001

上面所有分区的 Leader、Replicas 和 Isr 都是 1001 ,因为集群中只有一个 broker,其 ID 为 1001。所有分区都位于这个唯一的 broker 上。

如果要查看所有topic的详情,可以省略 –topic 参数:

./kafka-topics.sh --describe --bootstrap-server localhost:9092

1.4 删除 Topic

注意:下面的命令是删除指定的topic及其所有数据。

./kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
  • –delete:指示要删除一个topic
  • –topic my-topic:指定要删除的topic名称

1.5 修改 topic

1.5.1 增加分区数

./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 6

增加 topic 分区数是相对安全的,因为不会重新分配现有数据,新分区没有历史数据。

需要注意的是可能影响基于 key 的分区策略,同时可能需要消费者重新平衡。

1.5.2 减少分区数

Kafka 不允许直接减少分区数,因为这可能导致数据丢失。

只能创建新主题,指定所需的较少分区数,然后把数据进行迁移。

1.5.3 增加副本数

不建议操作!可以增加副本数,但是较复杂,通常建议创建的时候就确认好副本数。

1.5.4 减少副本数

不建议操作!可以减少副本数,但是较复杂,通常建议创建的时候就确认好副本数。

2. 生产者以及消费者

生产者跟消费者脚本通常都是用于测试以及调试,生产环境下正常是不会进行使用的。

2.1 生产消息

下面的命令是启动一个命令行生产者,允许用户从命令行输入消息并发送到指定的 topic。

./kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
  • –topic my-topic:指定要发送消息的topic
  • –bootstrap-server localhost:9092:指定Kafka broker的地址

2.1.1 从某个文件中读取消息发送到队列

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic < kafka-messages.txt

可以用来批量导入历史数据、重放日志文件、测试等。上面的 kafka-messages.txt 将文件内容作为输入重定向到生产者,文件中的每一行都会被当作一条单独的消息发送。

2.1.2 指定消息 key

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=:"

这样可以发送带有 key 的消息,这对于保证消息顺序或分区策略很重要。 【parse.key=true】 告诉生产者消息包含 key , 【key.separator=: 】指定键和值之间的分隔符为冒号。

2.1.3 发送带时间戳的消息

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=," --property "timestamp.field.name=timestamp"

上面的配置允许发送带有自定义时间戳的消息。在一些重放历史数据时但是保留原始时间戳的场景下会使用到。【timestamp.field.name=timestamp】 指定了包含时间戳的字段名。

2.1.4 批量发送消息

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --batch-size 1000 --timeout 1000

使用这种方式可以提高大量消息发送时的效率。在做性能测试或者大规模数据导入时可以使用到,不过更多的是在程序代码中使用这种批量思想。

【batch-size】指定每批发送的消息数量,【timeout】指定批处理的超时时间,单位是毫秒。

2.1.5 发送消息到指定分区

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --property "partition=0"

将消息发送到特定的分区,测试特定分区的消费者逻辑或者是手动管理消息分布的位置。

2.2 消费消息

./kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning
  • –topic my-topic:指定要消费的topic
  • –from-beginning:从topic的最早消息开始消费

如果只想消费最新的消息,可以省略–from-beginning:

./kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092

有一些比较常用的配置如下:

  • –property print.key=true –property key.separator=: 显示消息的 key 和 value
  • –max-messages 10 限制消费的消息数量
  • –property print.timestamp=true 显示消息的时间戳
  • –partition 0 从特定分区消费
  • –property print.offset=true 显示消费的偏移量信息

3. 消费者组管理

3.1 列出所有消费者组

./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

3.2 查看消费组详情

下面的命令显示指定消费者组的详细信息,包括每个分区的消费偏移量、滞后等。

./kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092
  • –describe:指示要查看详细信息
  • –group my-group:指定要查看的消费者组名称

输出示例如下:

GROUP                  TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                               HOST        CLIENT-ID
---------------------- --------- ---------- --------------- --------------- ---- --------------------------------------------------------- ----------- ----------------
console-consumer-12077 my-topic  0          -               2               -    console-consumer-fe6945df-3731-4a11-94af-34d84216ce5f     /127.0.0.1  console-consumer

console-consumer-12077 my-topic  1          -               0               -    console-consumer-fe6945df-3731-4a11-94af-34d84216ce5f     /127.0.0.1  console-consumer

console-consumer-12077 my-topic  2          -               0               -    console-consumer-fe6945df-3731-4a11-94af-34d84216ce5f     /127.0.0.1  console-consumer

从左到右依次的列意思:

  • GROUP: 消费者组的名称。
  • TOPIC: 被消费的 topic 名称。
  • PARTITION: topic 的分区号。 这里有 1、 2、 3 分区。
  • CURRENT-OFFSET: 当前消费者组在该分区的消费位置。 如果是显示 - 则表示还没有开始消费。
  • LOG-END-OFFSET: 该分区的最新消息偏移量。分区0有2条消息,分区1和2都是0 代表两个分区目前没有消息。
  • LAG: 消费滞后量,即还未被消费的消息数量。 显示为 - 则表示无法计算滞后量。
  • CONSUMER-ID: 消费者的唯一标识符。
  • HOST: 消费者所在的主机。
  • CLIENT-ID: 消费者客户端的ID。

3.3 重置消费者组偏移量

下面的命令将指定消费者组在指定topic上的消费偏移量重置到最早的位置。

./kafka-consumer-groups.sh --group my-group --topic my-topic --reset-offsets --to-earliest --execute --bootstrap-server localhost:9092
  • –reset-offsets:指示要重置偏移量
  • –to-earliest:将偏移量重置到最早的位置
  • –execute:执行重置操作,不加此参数只会显示计划而不执行。

3.4 将消费者组的偏移量向前或向后移动指定数量

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --shift-by -10 --execute --topic my-topic

3.5 重置偏移量到特定时间点

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-datetime 2022-06-01T00:00:00.000 --execute --topic my-topic

3.6 删除消费者组

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group

3.7 显示消费者组的状态

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state

3.8 显示消费者组的成员

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members

3.9 显示消费者组的偏移量

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --offsets

3.10 重置所有主题的偏移量

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --all-topics

3.11 导出消费者组的偏移量

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe --offsets > group_offsets.txt

注意:研发在生产环境务必不要乱使用这些命令!

4. 其他

4.1 检查 kafka 集群状态

./kafka-broker-api-versions.sh --bootstrap-server localhost:9092