七的博客

Kafka快速入门5-Kafka生产者实践

消息队列

Kafka快速入门5-Kafka生产者实践

Kafka 生产者是负责将数据发送到 Kafka 集群的客户端应用程序,这也是平时开发 Kafka 相关业务时比较关心的一个点。

从生产者的角度来看,常碰见的问题有下面这些:

  • 怎么样合理的配置 kafka 生产者参数。
  • 消息 key 的使用,合理使用 key 将相关联消息放到同一个分区。
  • 批量发送提升吞吐量。
  • 消息压缩减少网络带宽。
  • 每条消息的大小限制。
  • 同步跟异步的发送模式。
  • 消息的确认机制、重试等。
  • 处理异常情况,比如网络问题、broker 不可用等。

创建一个基本的 kafka 生产者代码如下:

from kafka import KafkaProducer
import json


if __name__ == '__main__':
    producer = KafkaProducer(
        bootstrap_servers=['127.0.0.1:9092']
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    producer.send('my-topic' {'key': 'value'})
    producer.close()

如果找不到依赖先安装 kafka-python 库:

pip install kafka-python

1. 常用的生产者配置

生产者常用的配置参数大概就 10 几个左右,稍微关注下即可。这些参数在 kafka 文档中有很详细的描述,参数的定义均以官网文档为准。

  • bootstrap.servers。 这个是配置 Kafka 集群中broker的地址列表。比如 [‘localhost:9092’, ‘localhost:9093’] ,这里就是配置了两个 broker 的地址。如果这个参数不设置,默认将会连接到本地默认端口的 kafka 。

  • client.id 。这个是用于标识生产者的字符串,通常会给当前的生产者起一个有意义的名字,这个在监控的时候容易分析问题。

  • max.request.size。 这个参数控制生产者发送的单个请求的最大大小。默认值是1048576 (1MB)。如果需要发送大消息,适当增加这个值,但要确保与broker端的配置相匹配。

  • key.serializer。这个参数配置的是用于序列化消息 key 的函数。如果发送的消息有不是字节类型的 key,需要给这个 key 设置适当的序列化器。

  • value.serializer。这个参数配置的是用于序列化消息 value的函数。设置这个参数可以确保消息内容可以正确地在网络上传输。

  • request.timeout.ms。 配置生产者等待请求响应的最长时间,默认值是30000(30秒)。这个需要根据实际的网络条件来设置一个合理的超时时间。

  • acks 。消息确认的级别,下面的小节单独说明。

  • retries。消息发送失败时的重试次数,下面的小节单独说明。

  • batch.size。一个批次可以使用的最大内存大小,单位是字节。

  • linger.ms。等待更多消息加入批次的时间,单位是毫秒。下面的小节单独说明。

  • compression.type。 这个参数配置的是消息压缩类型。下面的小节单独说明。

  • delivery.timeout.ms。配置消息发送(包括重试)的最长时间,下面的小节单独说明。

2. 消息确认机制

在使用 kafka 时,确保消息被服务端正确处理是至关重要的。 这种情况下需要一种机制来确保生产者发送的消息被正确地接收和保存。这就是消息确认机制的由来。

确认机制是通过 acks 参数进行配置,这个参数配置的是消息确认的级别。目前一共有三个值:

  • 0: 生产者发送消息后立即认为成功,不等待任何确认。但是可能丢失数据,速度最快。
  • 1:生产者等待leader副本确认已接收消息。默认值,平衡了性能和可靠性。
  • all 或者 -1 :生产者等待所有同步副本确认已接收消息。最可靠,但性能最低。

默认值配置的是 1 ,需要根据自己的实际业务场景进行调整。默认的配置在性能跟可靠性之间做了平衡,大部分情况下不需要手动进行调整。

3. 发送失败重试

应用运行中,总会有各种各样的原因可能会连接不上 Kafka。如果这个时候刚好有生产者往 Kafka 写数据,写入的这个操作就会失败。

Kafka 生产者支持自动重试机制,可以在发送失败时自动重新发送消息。主要是通过下面的几个参数控制:

3.1 retries 参数

这个参数配置的是重试次数,简单的来说就是生产者尝试重新发送消息的最大次数。默认值是 2147483647, 实际上接近无限重试。默认的这个参数值会导致长时间的重试,会增加延迟。实践中这个值合理的范围是重试 3-10 次。( 无限重试的总消耗时间也必须在 delivery.timeout.ms 参数值内 )

3.2 retry.backoff.ms 参数

这个参数是定义两次重试操作之间的等待时间,默认值配置的是 100ms。这个值设置过小可能导致不必要的网络流量消耗,设置过大可能会增加消息延迟。

3.3 delivery.timeout.ms 参数

这个参数设置消息发送操作(包括所有重试)的总体超时时间,默认值是120000ms(2分钟)。通过配置这个参数可以防止因单个消息卡住而影响整个生产者的性能,这个参数值应该应该大于 (retries + 1) * retry.backoff.ms。

3.4 enable.idempotence 参数

确保即使在重试的情况下,每条消息也只会被准确写入一次,防止因重试导致的消息重复。

这几个参数都是需要协同工作的,单独的配置没有很大意义。

  • 当发送失败时,生产者会根据 retries 设置的次数进行重试。
  • 每次重试之间,会等待 retry.backoff.ms 指定的时间。
  • 整个发送过程,这里包括所有重试,不会超过 delivery.timeout.ms 指定的时间。
  • 如果 enable.idempotence 为 true,即使在重试的情况下,每条消息也只会被写入一次。

配置上面这几个参数的例子:

producer = KafkaProducer(
    retries=5
    retry_backoff_ms=100
    delivery_timeout_ms=30000
    enable_idempotence=True
)

上面的参数配置解释如下:

  • 如果消息发送失败,它会最多重试5次。
  • 每次重试之间会等待100ms。
  • 如果30秒后消息仍未成功发送,包括所有重试时间,操作将被当做失败。
  • 即使在重试过程中,每条消息也只会被写入一次。

4. 消息 key 的使用

kafka 每条消息都由 key、value 和 timestamp 等组成。消息结构类似于下面这样:

{
  "topic": "my_topic"
  "partition": 3
  "timestamp": 1132465789000
  "key": "ami_1062"
  "value": {
    "name": "ami"
    "age": 30
    "email": "1062@ami.com"
  }
  "headers": [
    {
      "key": "source"
      "value": "web_app"
    }
  ]
}

当然 kafka 本身使用二进制格式传输,这里只是为了更好理解 kafka 消息结构。这里面的 value 是程序中实际发送的内容, key 则是可选的。 在生产实践中 key 一般是用来处理下面这些问题:

  • 消息分区,在一些常见下,有关联的消息需要被发送到同一分区。
  • 保证特定消息的一个顺序。

几个比较常见的场景:

  • 比如要对一个用户一小段时间的操作记录进行分析,那么要确保同一个用户的所有操作日志都被发送到同一分区。这样由一个线程进行处理分析,数据分析的才正确。
  • 再比如电力采集系统中,采集到的数据都是带时间点的数据。 这样使用设备 ID 作为 key , 确保每个设备的数据都在同一分区按时间顺序存储。

在代码中可以这么使用:

from kafka import KafkaProducer
import json


producer = KafkaProducer(
    bootstrap_servers=['localhost:9092']
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
    key_serializer=lambda k: k.encode('utf-8')
)

# 发送带 key 的消息
def send_message_with_key(topic key value):
    future = producer.send(topic key=key value=value)
    try:
        record_metadata = future.get(timeout=10)
        print(f"消息发送成功:topic={record_metadata.topic}, partition={record_metadata.partition}, offset={record_metadata.offset}")
    except Exception as e:
        print(f"消息发送失败:{e}")


topic = 'meter_energy_data'
meter_id = '09512345678'
meter_energy_data = {'energy': '100' 'timestamp': '2021-01-01T00:00:00Z'}

send_message_with_key(topic meter_id meter_energy_data)

producer.close()

这样每一个电表的数据一定会落到同一个分区里面。如果你只有一个分区的话,无论你怎么配置都是只会落到一个分区。

主要几点会踩坑的地方:

  • 避免使用可能导致数据倾斜的key,比如固定值或分布不均的值。
  • 当key为null时,消息会被轮询分配到各个分区。这也是大部分情况下的场景。
  • 如果key的基数很小,考虑使用复合 key 或自定义分区策略。

5. 批量发送提升性能

先说下为什么进行批量发送,主要是为了减少网络请求次数,提升吞吐量。因为每次网络请求都有固定的开销,比如TCP握手、报文头等。但是批量发送可以分摊这些固定开销,提高网络利用率。

再一个是,kafka 服务端收到一批数据后,也使用批量写入可以更好的利用磁盘 I/O,提升写入效率。

批量发送主要涉及到下面几个参数。

5.1 batch.size 参数

batch.size 参数,一个批次可以使用的最大内存大小,单位是字节。默认值是 16384 ,也就是16KB。

这个参数是控制发送到服务器的请求大小,会影响吞吐量和延迟。

当新的消息被添加到批次时,Kafka 会检查当前批次的大小。如果添加新消息后批次大小超过了 batch.size ,当前批次将被发送,新消息将开始一个新的批次。

5.2 linger.ms 参数

linger.ms 参数,等待更多消息加入批次的时间。单位是毫秒。默认值是 0 ,也就是不等待。这个参数定义了在发送批次之前,生产者等待更多消息加入批次的最长时间。

当一条消息被添加到批次时,如果这是批次的第一条消息,一个计时器就会启动。如果在linger.ms时间内,批次大小没有达到batch.size,批次也会被发送。

如果不设置这个参数的话,不等待批次填满,可能降低吞吐量。如果可以容忍轻微的额外延迟,设置一个小的正值(如5-100ms)来提高吞吐量。增加可能导致额外的延迟。

5.3 max.in.flight.requests.per.connection 参数

这个参数控制生产者在单个连接上能够发送的未确认请求的最大数量。默认值是 5 ,也就是最多同时只有 5 个请求等待 kafka broker 确认。 如果这个时候再给 kafka broker 发送消息调用可能会被阻塞住。

工作流程:

  • 当生产者发送一个请求(包含一个或多个消息批次)到Kafka broker时,它会等待broker的确认。
  • 在等待确认的同时,生产者可以继续发送新的请求,直到未确认的请求数量达到 max.in.flight.requests.per.connection 设置的值。
  • 达到这个限制值后,生产者将停止发送新的请求,会一直等到收到之前请求的确认。
  • 当收到确认后,未确认请求的数量减少,生产者可以继续发送新的请求。

较高的值可以提高吞吐量,因为它允许生产者在等待确认的同时继续发送请求。较低的值,特别是1的情况下可以确保更严格的消息顺序。

在生产实践中,默认值 5 通常是一个比较合理的值,它在吞吐量和其他考虑因素之间取得了平衡。但是需要注意的是有几点比较坑的地方:

  • 如果启用幂等性后(enable.idempotence=true),max.in.flight.requests.per.connection 必须小于或等于 5。
  • 如果 retries 参数 > 0 且 max.in.flight.requests.per.connection > 1,那么在重试时可能会改变消息的顺序。
  • 如果消息顺序很重要,应该设置 max.in.flight.requests.per.connection = 1,或使用幂等性生产者。

5.4 参数协同工作机制

上面几个参数其实是需要协同工作的,所以参数搭配也是需要动态调整,这几个参数分别在参数积累阶段、批次准备阶段、发送阶段使用。

  • 消息累积阶段。当调用 send() 方法时,消息被添加到当前批次。如果这是批次的第一条消息,linger.ms 计时器启动。消息会一直添加到批次,直到达到 batch.size 或 linger.ms 时间到期。

  • 批次准备阶段。如果 batch.size 达到或 linger.ms 到期,批次准备发送。如果设置了压缩,整个批次会被压缩。

  • 发送阶段。生产者检查当前未确认的请求数是否小于 max.in.flight.requests.per.connection。如果小于的话,这个批次被发送;如果大于或者等于,这个批次只能继续等着。

举个参数配置的例子:

producer = KafkaProducer(
    batch_size=16384  # 16KB
    linger_ms=10
    max_in_flight_requests_per_connection=5
)

上面这个配置意思是: 消息会被累积到最大16KB的批次中。如果10ms内批次没有达到16KB,也会被发送。每个连接最多可以有5个未确认的请求。

6. 数据压缩

为了提高效率和减少网络带宽使用,Kafka 是提供了消息压缩机制。压缩可以在生产者端进行,也可以在 broker 端进行。

这里讲的是生产者发送的数据压缩,是为了降低网络带宽,提升发送过程中的效率。

在生产者端配置压缩:

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092']
    compression_type='gzip'  # 可选配置,可以选择的有 'gzip', 'snappy', 'lz4', 'zstd'
)

producer.send('my_topic' b'这是一个压缩的消息')

几种比较常用压缩算法对比:

  • GZIP。高压缩比,广泛支持的一种算法。但是CPU 使用较高,压缩/解压速度较慢。
  • Snappy。 压缩/解压速度快,CPU 使用低,压缩比相对较低。
  • LZ4。压缩/解压速度非常快,压缩比适中,在某些情况下压缩比不如 GZIP,需要平衡压缩比和速度的场景。
  • ZSTD 。高压缩比,快速解压,压缩速度可能低于 Snappy 和 LZ4。

压缩通常在文本数据上有一定收益,二进制数据通常压缩收益不高。同时压缩可能会增加生产者和消费者的 CPU 使用,因为发送跟接收存在进行压缩以及解压的过程,合理的选用压缩。

7. 同步发送以及异步发送

Kafka 生产者 API 提供了同步和异步两种方式发送消息。这两种方法各有优缺点,有不同的使用场景。

同步发送,这个比较好理解,就是同步发送提供即时的确认,但可能降低性能。

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

try:
    future = producer.send('my_topic', b'同步消息')
    record_metadata = future.get(timeout=10)
    print(f"Message sent to topic {record_metadata.topic} "
          f"partition {record_metadata.partition} "
          f"offset {record_metadata.offset} ")
except KafkaError as e:
    print(f"发送消息失败: {e}")

异步发送:

# 发送成功回调函数
def on_send_success(record_metadata):
    print(f"Message sent to topic {record_metadata.topic} "
          f"partition {record_metadata.partition} "
          f"offset {record_metadata.offset}")


# 发送失败回调函数
def on_send_error(excp):
    print(f"发送消息失败: {excp}")

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

producer.send('my_topic', b'这是一条异步消息').add_callback(on_send_success).add_errback(on_send_error)

同步以及异步的 API 需要在不同的场景下使用。

  • 同步发送可能导致较高的延迟,但是异步发送可能影响消息顺序。
  • 同步发送提供即时的确认,但可能降低性能。但是异步发送需要更复杂的错误处理机制。

8. 错误处理

比较常见的错误有下面几种。

8.1 网络错误类型

  • 连接到 broker 失败。 连接到 broker 失败这种错误通常本身不是由于应用的问题导致的,这种一般还是要保证 Kafka 的高可用,应用端无法进行解决。
  • 操作超时。 操作超时错误一般考虑增加超时时间参数,但是更重要的还是优化网络,让操作更加快速返回应答。

8.2 Broker错误

  • 分区leader不可用, 这种只能等待Kafka集群重新选举 leader 。也可以合理的实现重试机制,在一段时间后重试操作。
  • 没有足够的副本,应用中只能等待副本同步完成。 同时要检查 Kafka 集群配置,确保有足够的副本数。如果频繁发生的话,可能需要增加 broker 数量或调整副本数。

8.3 消息错误

  • 消息大小超过限制。消息合理的情况下,合理的增加 max.message.bytes 参数配置。也可以考虑将大消息拆分成小消息,也可以使用压缩来减小消息大小。

  • 消息格式无效,这种通常是生产者自己的代码问题。检查序列化和反序列化的代码,确保正确处理消息。

8.4 序列化错误类型

消息无法正确序列化,这种通常也是生产者自己的代码问题。检查序列化算法的实现,看看逻辑哪里有问题。

9. 总结

总结下 kafka 生产者注意的几个点:

  • 合理的使用批量发送,降低网络请求次数。
  • 数据太大可以选择合适的压缩算法,降低网络请求数据量大小。
  • 合理的使用异步发送接口,使用回调处理业务逻辑。
  • 合理的使用重试机制,保证业务正常执行。

参考链接