Kafka快速入门6-Kafka消费者实践
Kafka快速入门6-Kafka消费者实践
相比于 Kafka 的生产者, 消费者涉及到的概念更多,同时在生产实践中碰到的问题也更多。
但是主要关心的点是下面几个:
- 怎么去写一个消费者。
- 怎么订阅一个或者多个 topic 进行消息订阅。
- kafka 消费者的参数配置。
- offset 怎么提交。
- kafka 会怎么分配分区给消费者。
- 消费者组有什么作用。
- 怎么样去提升 kafka 的消费者性能。
1. 创建 kafka 消费者
代码还是以 Python3 为例讲解,使用其他语言基本也大差不差。
首先确保已经安装了kafka-python库。
pip install kafka-python
写一个基本的 kafka 消费者:
from kafka import KafkaConsumer
bootstrap_servers = ['localhost:9092'] # 这里配置 Kafka 服务器地址
# 创建 KafkaConsumer 实例
consumer = KafkaConsumer(
'my_topic', # 要消费的主题名称
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest', # 从最早的偏移量开始消费
group_id='my_consumer_group', # 消费者组ID
value_deserializer=lambda x: x.decode('utf-8') # 消息值的反序列化方法
)
print("========Kafka消费者已创建,等待接收消息=====")
# 这里循环从 consumer 中消费消息
for message in consumer:
print(f"收到消息: {message.value}")
上面的初始化消费者代码还是比较简单的,不过一些有一些配置可以暂时不用深入去了解。比如:
‘my_topic’ 就是要消费的 kafka topic 名称。
auto_offset_reset= ‘earliest’ 。如果没有初始偏移量或当前偏移量在服务器上不存在,则从最早的消息开始消费。
group_id 是指定消费者组的ID。
value_deserializer 是用于反序列化消息值的函数。通常消息是 UTF-8 编码的字符串。
上面代码运行后,就会不断循环从 Kafka 获取消息。每条消息中包含多个字段:
- topic: 消息的 topic 名称。
- partition: 消息所在的分区。
- offset: 消息在分区中的偏移量。
- timestamp: 消息的时间戳。
- key: 消息的 key,有可能为空。
- value: 消息的值。
上面的代码只是简单的打印了下消息的 value,生产中一般会添加更多的逻辑来消费消息。
2. 消费者涉及到的参数
消费者也是有挺多可以配置的参数,这些参数跟性能、稳定性都有很大的关系。
2.1 bootstrap.servers
这个参数指定Kafka集群的连接地址,是一个必须提供的参数地址。 也支持提供多个地址。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['server1:9092', 'server2:9092', 'server3:9092']
)
2.2 group.id
这个参数指定消费者组的唯一标识符。在实际的生产运用中,要给消费者组指定唯一的 group_id。同一组内的消费者协同工作,分摊 topic 的分区消息,提升消费性能。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
group_id='my_consumer_group'
)
避坑点:
- Kafka 会使用 group_id 来管理偏移量。每个组都有自己的偏移量跟踪。
- 当消费者加入或离开组时,Kafka会触发再平衡,重新分配分区给组内的消费者。
- 不同的 group_id 允许多个应用独立消费同一个 topic ,互不影响。
2.3 auto.offset.reset
这个参数也很重要,主要是告诉消费者应该从哪里开始消费。一共有下面几个选择:
- earliest。从最早的可用消息开始消费,可能导致重复消费。
- latest。从最新的消息开始消费。
- none。如果没有找到以前的偏移量,则抛出异常。
- earliest。适用于需要处理所有历史数据的场景。
- latest。适用于只关心最新数据的实时处理场景。
默认值是 latest,也就是从最新的消息开始进行消费。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest'
)
有些情况下,没有初始偏移量或当前偏移量在服务器上不再存在时,比如如数据已经被删除的时候,一个合适的初始化消费位置就比较重要。
2.4 enable.auto.commit
这个参数配置要不要自动提交已消费消息的偏移量。只有 2 个参数值配置:
- True:消费者会定期自动提交已处理消息的偏移量。自动提交发生在后台,间隔由 auto_commit_interval_ms 控制。
- False:适用于需要精确控制偏移量提交的场景,如事务性处理。
比如手动提交:
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
enable_auto_commit=False
)
for message in consumer:
process_message(message)
# 手动提交
consumer.commit()
大部分应用都应该选择手动提交,自动提交适用于处理速度快、可以容忍少量重复消费的场景。
2.5 max.poll.records
这个参数控制单次调用 poll() 方法时返回的最大消息数,默认值是 500。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
max_poll_records=1000
)
一般来说需要根据消息处理速度和内存限制进行调整。较大的值可以提高吞吐量,但会增加内存使用和处理时间。
2.6 max.poll.interval.ms
这个参数控制两次调用 poll() 方法之间的最大延迟。如果超过这个时间没有调用 poll() ,消费者会被认为已死亡,并触发重新平衡 rebalance 。默认值是 300000 (5分钟)。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
max_poll_interval_ms=600000 # 10分钟
)
比较坑的地方需要注意,如果消息处理时间长,需要增加这个值。否则消费者被错误地认为已死亡,引发奇奇怪怪的问题。
2.6 session.timeout.ms
这个参数控制消费者组认为消费者死亡的超时时间,默认值是 10000 (10秒)。值太小可能导致不必要的重平衡,值太大可能导致检测失败消费者的延迟增加。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
session_timeout_ms=30000 # 30秒
)
2.7 heartbeat.interval.ms
这个参数控制消费者向组协调器发送心跳的频率,心跳用于确保消费者的活跃状态,并促进 rebalance 过程。
默认值是 3000 (3秒) ,建议是设置为 session_timeout_ms 的 1⁄3 。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
heartbeat_interval_ms=3000
)
2.8 fetch.max.bytes
这个参数是限制了服务器对单个 fetch 请求返回的数据量,默认值是 52428800 (50 MB)。 默认值已经足够大了,大部分情况下不需要进行调整。。增加这个值可以提高吞吐量,但会增加内存使用。
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
fetch_max_bytes=104857600 # 100 MB
)
注意,这个值应该大于服务器上的 message.max.bytes,也就是单条消息的最大大小。
4. 偏移量提交
偏移量 offset 是一个非常关键的概念,在前面讲架构的时候已经讲过一次。它表示消费者在特定分区中已经消费到的消息位置,每个消费者组都会跟踪它在每个分区上消费的偏移量。
偏移量提交经常碰到一些问题:
- 偏移量提交得太早,一般就是在消息实际处理完成之前就提交了,就可能导致消息重复消费。
- 如果偏移量提交得太晚或未能提交,可能导致消息丢失。
- 频繁提交偏移量会影响性能,但提交间隔过长可能导致大量消息重复处理。
- 有些场景下,需要确保每条消息恰好被处理一次,既不重复也不遗漏。
关于偏移量,上面的参数讲解中已经说有 2 种方式,分别是自动提交以及手动提交。偏移量提交跟数据库的事务提交套路差不多,有自动以及手动之分。
自动提交:
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic',
bootstrap_servers=['localhost:9092'],
group_id='my_group',
enable_auto_commit=True,
auto_commit_interval_ms=5000)
for message in consumer:
print(message)
# 处理消息就行,没有别的操作
手动提交:
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic',
bootstrap_servers=['localhost:9092'],
group_id='my_group',
enable_auto_commit=False)
for message in consumer:
print(message)
# 处理消息后,手动提交偏移量。
consumer.commit()
批量处理后手动提交:
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic',
bootstrap_servers=['localhost:9092'],
group_id='my_group',
enable_auto_commit=False)
batch = []
for message in consumer:
batch.append(message)
if len(batch) >= 100: # 一批处理 100条消息
process_batch(batch)
# 每 100 条消息处理完,再提交一次
consumer.commit()
batch = []
比较常用的就是上面几种,其中批量消息消息+手动提交是最为稳妥的一种方式。
5. 分区分配策略
在 Kafka 中,一个 topic 可以有多个分区,而一个消费者组可以有多个消费者实例。
分区消息分配容易出现的问题:
- 不恰当的分区分配可能会导致某些消费者处理太多的消息,有一部分消费者就会闲着没事干。
- 当消费者加入或离开组时,可能触发不必要的重平衡,影响系统性能。
到目前,分区分配策略有下面几种:
5.1 RangeAssignor
简单的理解就是范围分配器。这是 Kafka 的默认分配策略,从早期版本到现在一直都是。
对每个主题,按照分区号和消费者字典序排序,为每个消费者分配一个连续的分区范围。当消费者数量不变时,分配结果比较稳定。
踩坑点:
- 当 topic 数量不是消费者数量的整数倍时,容易导致分配不均衡。
5.2 RoundRobinAssignor
看名字就能猜得到肯定是轮训算法,这种策略主要是为了解决RangeAssignor的不均衡问题而引入的。
策略会将所有 topic 的所有分区放入一个池中,然后轮流为每个消费者分配分区,所以在大多数情况下能实现更均匀的分配。
踩坑点:
- 当消费者加入或离开时,可能导致大规模的分区重新分配。
5.3 StickyAssignor
粘性分配器,这个策略主要是为了减少不必要的分区移动。尽量保持现有的分区分配,只在必要时进行最小化的重新分配。
优点就是减少分区移动,提高稳定性。而且在消费者加入或离开时,最小化数据迁移。
5.4 自定义分区分配策略
也可以自定义分配逻辑,比较灵活,可以适应特定的业务需求。但是会增加一些开发以及维护成本。
6. 消费者组概念
Kafka 中的消费者组是一种用于并行消费消息的机制。一个消费者组是由一个或多个消费者实例组成的,这些消费者共同订阅一个或多个 topic ,并协作处理这些 topic 中的消息。
在早期的消息队列系统中,通常采用点对点模式,即一条消息只能被一个消费者处理。随着消息量的增加,这种模式无法满足高吞吐量的要求。Kafka 里面是引入消费者组概念来解决这个问题。
消费者组的作用:
- 同一组内的消费者可以分摊 topic 分区的消费任务,提高整体处理能力。
- 通过增加或减少消费者实例来动态调整处理能力。
- 同一分区的消息总是由同一个消费者处理,这样可以保证分区内消息的消费顺序性。
- 当一个消费者失败时,其他消费者可以接管其任务,提高系统可靠性。
只要多个消费者进程指定相同的 group_id,那么就算一个消费者组。 无论是在一台服务器还是几台服务器上。
from kafka import KafkaConsumer
import multiprocessing
def consume(consumer_id):
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
# 指定消费者组的 ID 是 my-group
group_id='my-group'
)
for message in consumer:
print(f"消费者 {consumer_id} 收到消息: {message.value.decode('utf-8')}")
if __name__ == '__main__':
# 这里创建创建多个消费者进程,分配到同一个消费者组中。
processes = []
# 创建3个消费者
for i in range(3):
p = multiprocessing.Process(target=consume, args=(i,))
processes.append(p)
p.start()
# 等待所有进程处理完消息
for p in processes:
p.join()
7. 消费者性能优化技巧
消费者端提升性能主要是依靠上面这些点来组合达到效果。
- 批量处理。处理消息时,尽量批量处理而不是一条条处理。这可以减少网络往返和数据库操作的次数。
messages = consumer.poll(timeout_ms=1000, max_records=100)
if messages:
# 批量处理消息,再提交偏移量
process_batch(messages)
consumer.commit()
- 并行处理。可以利用多线程或多进程来并行处理消息。注意,单个分区的消息必须按顺序处理。
from concurrent.futures import ThreadPoolExecutor
def process_message(message):
# 处理单条消息的逻辑
with ThreadPoolExecutor(max_workers=5) as executor:
for message in consumer:
executor.submit(process_message, message)
- 调整 fetch.min.bytes 和 fetch.max.wait.ms 控制消费者从服务器获取消息的行为。fetch.min.bytes 是每次fetch请求从服务器获取的最小数据量。fetch.max.wait.ms 是如果没有足够的数据,服务器在响应fetch请求之前等待的最长时间。
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
fetch_min_bytes=1024 * 1024, # 1MB
fetch_max_wait_ms=500
)
- 合理设置 max.poll.records 这个参数。max.poll.records 控制单次调用 poll() 方法时返回的记录数。根据服务器硬件 + 处理能力来设置这个值。
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
max_poll_records=500
)
上面的这几点就是消费者端优化性能比较常用的几点,同时也是收益比较高的优化点。
再一个就是需要优化应用程序处理消息逻辑,这个才是最影响消费者性能的地方。 处理单条消息太慢的话,上面这些无论再怎么去调优,效果都不会特别明显。
参考链接
- Consumer Configs https://kafka.apache.org/documentation/#consumerconfigs