七的博客

Kafka快速入门6-Kafka消费者实践

消息队列

Kafka快速入门6-Kafka消费者实践

相比于 Kafka 的生产者, 消费者涉及到的概念更多,同时在生产实践中碰到的问题也更多。

但是主要关心的点是下面几个:

  • 怎么去写一个消费者。
  • 怎么订阅一个或者多个 topic 进行消息订阅。
  • kafka 消费者的参数配置。
  • offset 怎么提交。
  • kafka 会怎么分配分区给消费者。
  • 消费者组有什么作用。
  • 怎么样去提升 kafka 的消费者性能。

1. 创建 kafka 消费者

代码还是以 Python3 为例讲解,使用其他语言基本也大差不差。

首先确保已经安装了kafka-python库。

pip install kafka-python

写一个基本的 kafka 消费者:

from kafka import KafkaConsumer

bootstrap_servers = ['localhost:9092']  # 这里配置 Kafka 服务器地址

# 创建 KafkaConsumer 实例
consumer = KafkaConsumer(
    'my_topic', # 要消费的主题名称
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',  # 从最早的偏移量开始消费
    group_id='my_consumer_group',  # 消费者组ID
    value_deserializer=lambda x: x.decode('utf-8')  # 消息值的反序列化方法
)

print("========Kafka消费者已创建,等待接收消息=====")

# 这里循环从 consumer 中消费消息
for message in consumer:
    print(f"收到消息: {message.value}")

上面的初始化消费者代码还是比较简单的,不过一些有一些配置可以暂时不用深入去了解。比如:

  • ‘my_topic’ 就是要消费的 kafka topic 名称。

  • auto_offset_reset= ‘earliest’ 。如果没有初始偏移量或当前偏移量在服务器上不存在,则从最早的消息开始消费。

  • group_id 是指定消费者组的ID。

  • value_deserializer 是用于反序列化消息值的函数。通常消息是 UTF-8 编码的字符串。

上面代码运行后,就会不断循环从 Kafka 获取消息。每条消息中包含多个字段:

  • topic: 消息的 topic 名称。
  • partition: 消息所在的分区。
  • offset: 消息在分区中的偏移量。
  • timestamp: 消息的时间戳。
  • key: 消息的 key,有可能为空。
  • value: 消息的值。

上面的代码只是简单的打印了下消息的 value,生产中一般会添加更多的逻辑来消费消息。

2. 消费者涉及到的参数

消费者也是有挺多可以配置的参数,这些参数跟性能、稳定性都有很大的关系。

2.1 bootstrap.servers

这个参数指定Kafka集群的连接地址,是一个必须提供的参数地址。 也支持提供多个地址。

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['server1:9092', 'server2:9092', 'server3:9092']
)

2.2 group.id

这个参数指定消费者组的唯一标识符。在实际的生产运用中,要给消费者组指定唯一的 group_id。同一组内的消费者协同工作,分摊 topic 的分区消息,提升消费性能。

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my_consumer_group'
)

避坑点:

  • Kafka 会使用 group_id 来管理偏移量。每个组都有自己的偏移量跟踪。
  • 当消费者加入或离开组时,Kafka会触发再平衡,重新分配分区给组内的消费者。
  • 不同的 group_id 允许多个应用独立消费同一个 topic ,互不影响。

2.3 auto.offset.reset

这个参数也很重要,主要是告诉消费者应该从哪里开始消费。一共有下面几个选择:

  • earliest。从最早的可用消息开始消费,可能导致重复消费。
  • latest。从最新的消息开始消费。
  • none。如果没有找到以前的偏移量,则抛出异常。
  • earliest。适用于需要处理所有历史数据的场景。
  • latest。适用于只关心最新数据的实时处理场景。

默认值是 latest,也就是从最新的消息开始进行消费。

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest'
)

有些情况下,没有初始偏移量或当前偏移量在服务器上不再存在时,比如如数据已经被删除的时候,一个合适的初始化消费位置就比较重要。

2.4 enable.auto.commit

这个参数配置要不要自动提交已消费消息的偏移量。只有 2 个参数值配置:

  • True:消费者会定期自动提交已处理消息的偏移量。自动提交发生在后台,间隔由 auto_commit_interval_ms 控制。
  • False:适用于需要精确控制偏移量提交的场景,如事务性处理。

比如手动提交:

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    enable_auto_commit=False
)

for message in consumer:
    process_message(message)
    # 手动提交
    consumer.commit()

大部分应用都应该选择手动提交,自动提交适用于处理速度快、可以容忍少量重复消费的场景。

2.5 max.poll.records

这个参数控制单次调用 poll() 方法时返回的最大消息数,默认值是 500。

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    max_poll_records=1000
)

一般来说需要根据消息处理速度和内存限制进行调整。较大的值可以提高吞吐量,但会增加内存使用和处理时间。

2.6 max.poll.interval.ms

这个参数控制两次调用 poll() 方法之间的最大延迟。如果超过这个时间没有调用 poll() ,消费者会被认为已死亡,并触发重新平衡 rebalance 。默认值是 300000 (5分钟)。

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    max_poll_interval_ms=600000  # 10分钟
)

比较坑的地方需要注意,如果消息处理时间长,需要增加这个值。否则消费者被错误地认为已死亡,引发奇奇怪怪的问题。

2.6 session.timeout.ms

这个参数控制消费者组认为消费者死亡的超时时间,默认值是 10000 (10秒)。值太小可能导致不必要的重平衡,值太大可能导致检测失败消费者的延迟增加。

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    session_timeout_ms=30000  # 30秒
)

2.7 heartbeat.interval.ms

这个参数控制消费者向组协调器发送心跳的频率,心跳用于确保消费者的活跃状态,并促进 rebalance 过程。

默认值是 3000 (3秒) ,建议是设置为 session_timeout_ms 的 13

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    heartbeat_interval_ms=3000
)

2.8 fetch.max.bytes

这个参数是限制了服务器对单个 fetch 请求返回的数据量,默认值是 52428800 (50 MB)。 默认值已经足够大了,大部分情况下不需要进行调整。。增加这个值可以提高吞吐量,但会增加内存使用。

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    fetch_max_bytes=104857600  # 100 MB
)

注意,这个值应该大于服务器上的 message.max.bytes,也就是单条消息的最大大小。

4. 偏移量提交

偏移量 offset 是一个非常关键的概念,在前面讲架构的时候已经讲过一次。它表示消费者在特定分区中已经消费到的消息位置,每个消费者组都会跟踪它在每个分区上消费的偏移量。

偏移量提交经常碰到一些问题:

  • 偏移量提交得太早,一般就是在消息实际处理完成之前就提交了,就可能导致消息重复消费。
  • 如果偏移量提交得太晚或未能提交,可能导致消息丢失。
  • 频繁提交偏移量会影响性能,但提交间隔过长可能导致大量消息重复处理。
  • 有些场景下,需要确保每条消息恰好被处理一次,既不重复也不遗漏。

关于偏移量,上面的参数讲解中已经说有 2 种方式,分别是自动提交以及手动提交。偏移量提交跟数据库的事务提交套路差不多,有自动以及手动之分。

自动提交:

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic',
                         bootstrap_servers=['localhost:9092'],
                         group_id='my_group',
                         enable_auto_commit=True,
                         auto_commit_interval_ms=5000)

for message in consumer:
    print(message)
    # 处理消息就行,没有别的操作

手动提交:

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic',
                         bootstrap_servers=['localhost:9092'],
                         group_id='my_group',
                         enable_auto_commit=False)

for message in consumer:
    print(message)
    # 处理消息后,手动提交偏移量。
    consumer.commit()

批量处理后手动提交:

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic',
                         bootstrap_servers=['localhost:9092'],
                         group_id='my_group',
                         enable_auto_commit=False)

batch = []
for message in consumer:
    batch.append(message)
    if len(batch) >= 100:  # 一批处理 100条消息
        process_batch(batch)
        
        # 每 100 条消息处理完,再提交一次
        consumer.commit()
        batch = []

比较常用的就是上面几种,其中批量消息消息+手动提交是最为稳妥的一种方式。

5. 分区分配策略

在 Kafka 中,一个 topic 可以有多个分区,而一个消费者组可以有多个消费者实例。

分区消息分配容易出现的问题:

  • 不恰当的分区分配可能会导致某些消费者处理太多的消息,有一部分消费者就会闲着没事干。
  • 当消费者加入或离开组时,可能触发不必要的重平衡,影响系统性能。

到目前,分区分配策略有下面几种:

5.1 RangeAssignor

简单的理解就是范围分配器。这是 Kafka 的默认分配策略,从早期版本到现在一直都是。

对每个主题,按照分区号和消费者字典序排序,为每个消费者分配一个连续的分区范围。当消费者数量不变时,分配结果比较稳定。

踩坑点:

  • 当 topic 数量不是消费者数量的整数倍时,容易导致分配不均衡。

5.2 RoundRobinAssignor

看名字就能猜得到肯定是轮训算法,这种策略主要是为了解决RangeAssignor的不均衡问题而引入的。

策略会将所有 topic 的所有分区放入一个池中,然后轮流为每个消费者分配分区,所以在大多数情况下能实现更均匀的分配。

踩坑点:

  • 当消费者加入或离开时,可能导致大规模的分区重新分配。

5.3 StickyAssignor

粘性分配器,这个策略主要是为了减少不必要的分区移动。尽量保持现有的分区分配,只在必要时进行最小化的重新分配。

优点就是减少分区移动,提高稳定性。而且在消费者加入或离开时,最小化数据迁移。

5.4 自定义分区分配策略

也可以自定义分配逻辑,比较灵活,可以适应特定的业务需求。但是会增加一些开发以及维护成本。

6. 消费者组概念

Kafka 中的消费者组是一种用于并行消费消息的机制。一个消费者组是由一个或多个消费者实例组成的,这些消费者共同订阅一个或多个 topic ,并协作处理这些 topic 中的消息。

在早期的消息队列系统中,通常采用点对点模式,即一条消息只能被一个消费者处理。随着消息量的增加,这种模式无法满足高吞吐量的要求。Kafka 里面是引入消费者组概念来解决这个问题。

消费者组的作用:

  • 同一组内的消费者可以分摊 topic 分区的消费任务,提高整体处理能力。
  • 通过增加或减少消费者实例来动态调整处理能力。
  • 同一分区的消息总是由同一个消费者处理,这样可以保证分区内消息的消费顺序性。
  • 当一个消费者失败时,其他消费者可以接管其任务,提高系统可靠性。

只要多个消费者进程指定相同的 group_id,那么就算一个消费者组。 无论是在一台服务器还是几台服务器上。

from kafka import KafkaConsumer
import multiprocessing

def consume(consumer_id):
    consumer = KafkaConsumer(
        'my-topic',
        bootstrap_servers=['localhost:9092'],
        # 指定消费者组的 ID 是 my-group
        group_id='my-group'  
    )
    
    for message in consumer:
        print(f"消费者 {consumer_id} 收到消息: {message.value.decode('utf-8')}")

if __name__ == '__main__':
    
    # 这里创建创建多个消费者进程,分配到同一个消费者组中。
    processes = []
    
    # 创建3个消费者
    for i in range(3):  
        p = multiprocessing.Process(target=consume, args=(i,))
        processes.append(p)
        p.start()
    
    # 等待所有进程处理完消息
    for p in processes:
        p.join()

7. 消费者性能优化技巧

消费者端提升性能主要是依靠上面这些点来组合达到效果。

  • 批量处理。处理消息时,尽量批量处理而不是一条条处理。这可以减少网络往返和数据库操作的次数。
  messages = consumer.poll(timeout_ms=1000, max_records=100)
  if messages:
      # 批量处理消息,再提交偏移量
      process_batch(messages)  
      consumer.commit()
  • 并行处理。可以利用多线程或多进程来并行处理消息。注意,单个分区的消息必须按顺序处理。
  from concurrent.futures import ThreadPoolExecutor
  
  def process_message(message):
      # 处理单条消息的逻辑
  
  with ThreadPoolExecutor(max_workers=5) as executor:
      for message in consumer:
          executor.submit(process_message, message)
  • 调整 fetch.min.bytes 和 fetch.max.wait.ms 控制消费者从服务器获取消息的行为。fetch.min.bytes 是每次fetch请求从服务器获取的最小数据量。fetch.max.wait.ms 是如果没有足够的数据,服务器在响应fetch请求之前等待的最长时间。
  consumer = KafkaConsumer(
      bootstrap_servers=['localhost:9092'],
      fetch_min_bytes=1024 * 1024,  # 1MB
      fetch_max_wait_ms=500
  )
  • 合理设置 max.poll.records 这个参数。max.poll.records 控制单次调用 poll() 方法时返回的记录数。根据服务器硬件 + 处理能力来设置这个值。
  consumer = KafkaConsumer(
      bootstrap_servers=['localhost:9092'],
      max_poll_records=500
  )

上面的这几点就是消费者端优化性能比较常用的几点,同时也是收益比较高的优化点。

再一个就是需要优化应用程序处理消息逻辑,这个才是最影响消费者性能的地方。 处理单条消息太慢的话,上面这些无论再怎么去调优,效果都不会特别明显。

参考链接