七的博客

Kafka快速入门3-Kafka单机版本安装与基本使用

消息队列

Kafka快速入门3-Kafka单机版本安装与基本使用

kafka 的官网文档提供了一个 Quickstart ,这已经是一个非常通俗易懂的例子,这个章节会参考这篇文章。

1. 快速下载

# 下载以及解压 
$ wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1

2. 准备 Kafka 环境

在运行这个版本的 Kafka 之前,你本地需要至少安装 JDK8 以及以上的 JDK 版本。

Apache Kafka 可以使用 ZooKeeper 或 KRaft 启动,可以使用这两种其中的一种进行启动。 KRaft 模式简化了部署,因为不再需要单独的 ZooKeeper 集群。但是 Kafka 中的 KRaft 模式由于相对较新,但正在快速发展和改进,这也是 Kafka 后续版本迭代的一个方向。

2.1 使用 ZooKeeper 的 Kafka

ZooKeeper 是 Kafka 的一个重要依赖,用来管理集群状态、配置等。

按照下面的顺序启动所有的服务,先启动 Zookeeper 服务:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

再启动 Kafka Broker 服务,它是实际处理消息的服务:

$ bin/kafka-server-start.sh config/server.properties

这两个服务启动后,就有一个完整的 Kafka 环境。

2.2 使用 KRaft 的 Kafka

KRaft 是 Kafka 的一个新特性,允许 Kafka 不依赖 ZooKeeper 运行,步骤比上面的 Zookeeper 更多一点。

首先是生成集群 UUID ,这个 ID 是 Kafka 集群的唯一标识符。生成它是为了确保集群的唯一性:

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录:

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

这个格式化日志目录是 KRaft 模式特有的步骤,它初始化存储元数据的目录。

使用 KRaft 特定的配置文件启动 Kafka 服务器:

$ bin/kafka-server-start.sh config/kraft/server.properties

3. 创建 topic

在往 Kafka 写入消息之前,需要先创建 topic ,打开一个终端运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

上面的命令使用 kafka-topics.sh 脚本创建一个名为 “quickstart-events” 的 topic,–bootstrap-server localhost:9092 是指定 Kafka 服务器的地址和端口。

所有 Kafka 的命令行工具都有额外的选项,运行不带参数的 kafka-topics.sh 命令可以显示使用信息。例如还可以显示 topic 的分区数等详细信息:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

Topic: quickstart-events   // topic 名称
TopicId: NPmZHyhbR9y00wMglMH2sg   // topic 主题的唯一标识符
PartitionCount: 1   // 分区数
ReplicationFactor: 1  // 副本因子
Configs:
    Topic: quickstart-events 
    Partition: 0
    Leader: 0  
    Replicas: 0 
    Isr: 0

这上面输出显示了主题的名称、ID、分区数(PartitionCount)、复制因子(ReplicationFactor)等信息。

4. 向 topic 写入消息

Kafka 客户端通过网络与 Kafka 代理 brokers 通信,用于写入或者读取消息。接收到消息后,Broker 会持久化保存这些消息,也可以根据需要保存任意时长的时间。

运行生产者来写 topic 写入一些事件,默认情况下每一行都会作为一条单独的消息:

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

This is my first event
This is my second event

上面这条命令启动了一个控制台上的生产者客户端,连接到了本地的 Kafka 服务器 localhost:9092,然后向 “quickstart-events” 这个 topic 发送 2 条消息。

上面的生产者退出直接使用 Ctrl-C 来停止即可。

5. 读取 topic 消息

打开一个命令行会话,运行消费者客户端来读取刚刚发送到 kafka 的消息:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

This is my first event
This is my second event

上面这条命令启动了一个控制台上的消费者客户端,连接到了本地的 Kafka 服务器 localhost:9092,然后从 “quickstart-events” 这个 topic 的开始位置读取消息。

这里的 –from-beginning 参数是让消费者从 topic 的最开始读取所有消息,不然的话默认是只读取消费者启动后新产生的消息。

6. Kafka Connect 的使用

这一小节做 Web 开发的一般使用的会少一点,通常做数据分析等的会用到。

Kafka Connect 主要是用于在 Kafka 和其他数据系统之间数据迁移。它的作用:

  • 允许把数据从外部系统导入到 Kafka 中,也可以从 Kafka 中导入到外部系统。
  • 现有系统跟 Kafka 集成会变得比较简单。
  • Kafka 提供了数百个现成的连接器可用,这里的连接器指的就是连接外部数据源、读取外部数据源数据、将数据写入外部数据源等等这些操作的封装。

Kafka Connect 流程

上个例子,使用简单的连接器将数据从文件导入到 Kafka topic ,还有从从 Kafka topic 导出数据到文件中。

6.1 配置插件

在 Kafka Connect 中,插件路径 plugin.path 用于指定连接器 JAR 包的位置。

编辑 config/connect-standalone.properties 文件,添加以下内容:

echo "plugin.path=libs/connect-file-3.3.1.jar"

这样确保 Kafka Connect 能够找到并加载连接器。

6.2 创建测试数据

创建一个简单的文本文件 test.txt,包含一些初始数据:

echo -e "foo\nbar" > test.txt

6.3 启动连接器

使用单机模式启动两个连接器:

  • 源文件连接器:从 test.txt 读取数据并写入 Kafka topic。
  • sink 连接器:从 Kafka 主题读取数据并写入输出文件 test.sink.txt。

执行以下命令启动连接器:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

6.4 验证数据流

检查输出文件 test.sink.txt,确认数据被正确传输:

more test.sink.txt

6.5 控制台消费者查看数据

运行 Kafka 控制台消费者来查看 Kafka topic 中的数据:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

6.6 动态添加数据

向 test.txt 添加数据,并观察其通过管道传输:

echo Another line>> test.txt

通过这些步骤,Kafka Connect 可以比较轻松实现数据在不同系统之间的流动,确保数据的实时性和可靠性。

7. Kafka Streams 处理消息

Kafka Streams 是一个用于实时处理 Kafka 中数据的客户端库,支持 Java 和 Scala。通过它可以实现实时应用和微服务,这些应用的输入和/或输出数据保存在 Kafka topic 里面。

Kafka Stream 应用

Kafka Streams 的特点简单总结一下:

  • 像编写普通 Java/Scala 应用程序一样编写流处理逻辑,很简单。
  • 结合 Kafka 的集群技术,支持分布式、弹性和容错的应用。
  • 支持精确一次处理、有状态操作、窗口操作、连接、基于事件时间的处理等。

演示一下经典的单词统计例子,使用的是 Java 语言:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
    .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
    .groupBy((keyIgnored, word) -> word)
    .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

解释下上面的这几行代码:

  • 从 quickstart-events 这个 topic 读取文本行。
  • 使用 flatMapValues() 将每行文本分割成单词,再使用 groupBy 按单词分组, 最后使用 count 计算每个单词的出现次数。
  • 最后 将结果写入 output-topic 这个 topic 。

参考链接