Mokeke's Blog
返回文章列表

Kafka 50台消费者集群踩坑实录:Rebalance 风暴排查与治理

背景

我们有一条数据管线,架构如下:

上游数据源 → Kafka Source Topic (50分区) → Python 中转服务 (50台) → Kafka Target Topic → 数据仓库

Python 中转服务使用 kafka-python 库,部署在 K8s 集群中,50 个 Pod 对应 50 个分区,每台消费一个分区做数据清洗转发。

看起来很简单对吧?但部署后一条消息都消费不了,折腾了好几轮才搞定。把踩过的坑全记录下来,供后来人参考。


坑一:assign 模式 —— 50台全绑到同一个分区

最初的代码

from kafka import KafkaConsumer, TopicPartition
 
PARTITION_ID = int(os.environ.get("PARTITION_ID", "0"))
 
consumer = KafkaConsumer(
    group_id="my-consumer-group",
    auto_offset_reset="latest",
    enable_auto_commit=False,
    **KAFKA_CONFIG,
)
 
tp = TopicPartition(SOURCE_TOPIC, PARTITION_ID)
consumer.assign([tp])  # 手动绑定分区

问题

assign() 是手动绑定模式,需要外部传入 PARTITION_ID 环境变量。但 50 台机器部署时忘了设这个环境变量,默认值是 0,导致:

  • 50 台机器全部绑定到分区 0
  • 其他 49 个分区没有消费者
  • 消息持续堆积

教训

assign() 不参与消费组协调,不会自动分配分区。适合需要精确控制的场景,但必须确保每台机器的 PARTITION_ID 唯一。


坑二:subscribe() 导致 Rebalance 风暴

改进的代码

既然手动绑定容易出错,那就改成自动分配:

consumer = KafkaConsumer(
    group_id="my-consumer-group",
    auto_offset_reset="latest",
    enable_auto_commit=False,
    **KAFKA_CONFIG,
)
 
consumer.subscribe([SOURCE_TOPIC])  # 自动分配分区

问题

部署后日志是这样的:

Successfully joined group with generation 37
Setting newly assigned partitions {partition=8}
绑定分区: partition=8 offset=106
Heartbeat failed because it is rebalancing      ← 刚绑好就被踢!
Revoking previously assigned partitions {partition=8}
(Re-)joining group
Successfully joined group with generation 38    ← 又换分区
Setting newly assigned partitions {partition=7}
Heartbeat failed because it is rebalancing      ← 又被踢!
...

generation 数字不断增长,50 台机器反复 join → 分配 → 被踢 → 重新 join,谁也消费不了消息。

原因

50 个 Pod 同时启动,同时尝试加入消费组。每一个新成员加入都会触发整个消费组 rebalance,而默认的 session_timeout_ms=10000(10秒)太短 —— 在 rebalance 过程中,已分配的消费者心跳超时被 broker 踢出,又触发新一轮 rebalance,形成死循环

教训

subscribe 模式在大量消费者同时启动时要小心 rebalance 风暴,必须配合超时参数调优。


坑三:构造函数传 Topic 比 subscribe() 更稳定

对比两种写法

我们有一条运行正常的视频管线,用的是另一种写法:

# 写法 A:构造函数直接传 topic(视频管线用这种,稳定运行)
consumer = KafkaConsumer(
    SOURCE_TOPIC,
    group_id="my-consumer-group",
    **KAFKA_CONFIG,
)
 
# 写法 B:先创建再 subscribe(图片管线用这种,出了问题)
consumer = KafkaConsumer(
    group_id="my-consumer-group",
    **KAFKA_CONFIG,
)
consumer.subscribe([SOURCE_TOPIC])

理论上两者等价,但在某些托管 Kafka 云服务上,写法 A 的兼容性更好

另外视频管线还有一个关键调用:

consumer.poll(timeout_ms=1000)  # 触发分区分配
assigned = consumer.assignment()
for tp in assigned:
    position = consumer.position(tp)
    logger.info(f"绑定分区: topic={tp.topic} partition={tp.partition} offset={position}")

poll() 调用会主动触发分区分配,而不是被动等 for msg in consumer 时才分配。

教训

推荐在 KafkaConsumer 构造函数中直接传 topic,并用 poll() 主动触发分区分配。


坑四:session_timeout 太短 —— Rebalance 风暴的根因

最终修复

consumer = KafkaConsumer(
    SOURCE_TOPIC,
    group_id="my-consumer-group",
    auto_offset_reset="latest",
    enable_auto_commit=False,
    session_timeout_ms=60000,       # 默认 10s → 60s
    heartbeat_interval_ms=20000,    # 默认 3s  → 20s
    max_poll_interval_ms=600000,    # 默认 5min → 10min
    **KAFKA_CONFIG,
)

参数解释

参数默认值建议值作用
session_timeout_ms10,00060,000consumer 被判定死亡前的超时时间
heartbeat_interval_ms3,00020,000心跳发送间隔,建议 ≤ session_timeout 的 1/3
max_poll_interval_ms300,000600,000两次 poll 之间的最大间隔,处理慢消息时防超时

加大 session_timeout_ms 后,即使 50 台 Pod 的 join 过程持续几十秒,已分配的消费者也不会因为心跳超时被踢。等所有 Pod 全部 join 完成,rebalance 自然就停了。

效果

Successfully joined group with generation 46    ← 50 台全部同一个 generation
Successfully joined group with generation 46
Successfully joined group with generation 46
... (50 条,数字不再增长)

教训

消费者数量多(>10 台)且同时启动时,session_timeout_ms 至少设 60 秒。默认 10 秒在生产环境基本不够用。


坑五:auto_offset_reset=latest —— 历史消息全丢

问题

分区绑定成功了,但搜索 拉取消息 日志 —— 空的,一条都没消费。

查监控面板发现:

分区committed_offsetend_offsetlag
P093930
P109090
P20117117
P309999
...0......

49 个分区的 committed_offset 都是 0 —— 这是全新的消费组,之前没有消费记录。

auto_offset_reset="latest" 的意思是:如果没有历史 offset,从最新位置开始消费。也就是从 end_offset 开始等新消息,之前堆积的消息全部被跳过了!

修复

auto_offset_reset="earliest"  # 从头消费

什么时候用 latest vs earliest?

场景选择原因
新消费组首次启动earliest确保不丢失历史消息
消费组已有 committed offset无影响会从 committed 位置继续,不受此参数影响
只关心实时数据latest跳过历史积压

教训

新消费组首次消费必须用 earliest,否则堆积的消息不会被消费。这是最容易被忽略的配置。


最终的完整配置

from kafka import KafkaConsumer, KafkaProducer
 
KAFKA_CONFIG = {
    "bootstrap_servers": "your-broker:9092",
    "security_protocol": "SASL_PLAINTEXT",
    "sasl_mechanism": "SCRAM-SHA-512",
    "sasl_plain_username": "your-username",
    "sasl_plain_password": "your-password",
}
 
consumer = KafkaConsumer(
    "your-source-topic",                # 构造函数直接传 topic
    group_id="your-consumer-group",
    auto_offset_reset="earliest",       # 新消费组从头消费
    enable_auto_commit=False,           # 手动 commit,确保至少一次语义
    session_timeout_ms=60000,           # 60s,防 rebalance 风暴
    heartbeat_interval_ms=20000,        # 20s,降低心跳频率
    max_poll_interval_ms=600000,        # 10min,慢消息保护
    value_deserializer=lambda v: v.decode("utf-8"),
    **KAFKA_CONFIG,
)
 
producer = KafkaProducer(
    acks="all",                         # 所有副本确认
    retries=3,
    retry_backoff_ms=500,
    value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
    **KAFKA_CONFIG,
)
 
# 主动触发分区分配
consumer.poll(timeout_ms=1000)
for tp in consumer.assignment():
    pos = consumer.position(tp)
    logger.info(f"绑定分区: partition={tp.partition} offset={pos}")
 
# 消费循环
for msg in consumer:
    try:
        result = transform(msg.value)
        producer.send("your-target-topic", result)
        producer.flush()
        consumer.commit()  # 发送成功才 commit
    except Exception as e:
        logger.error(f"处理失败: {e}")
        consumer.commit()  # 失败也 commit,避免卡住(记录到错误队列)

排查流程总结

当 Kafka 消费者"不消费"时,按以下 5 步排查:

Step 1:服务是否启动?

搜索日志关键词 服务启动Updating subscribed topics

Step 2:消费组是否稳定?

搜索 generation —— 数字稳定不增长 = OK,持续增长 = rebalance 风暴

  • 修复:加大 session_timeout_ms 到 60 秒以上

Step 3:分区是否分配成功?

搜索 Setting newly assigned partitions 或自定义的 绑定分区 日志

  • 确认每个 Pod 绑定了不同分区

Step 4:是否在消费消息?

搜索业务日志(如 拉取消息

  • 有 = 正常消费中
  • 无 = 检查 auto_offset_reset,新消费组必须用 earliest

Step 5:下游是否写入成功?

搜索 解析成功 / 处理失败,检查目标 Topic 和数据仓库的数据量


kafka-python 配置速查表

KafkaConsumer(
    "topic-name",                       # 直接传 topic,别用 subscribe()
    group_id="xxx",                     # 消费组 ID
    auto_offset_reset="earliest",       # earliest | latest
    enable_auto_commit=False,           # 手动 commit
    session_timeout_ms=60000,           # ≥60s(多消费者必调)
    heartbeat_interval_ms=20000,        # ≤ session_timeout / 3
    max_poll_interval_ms=600000,        # 根据单条处理耗时调整
    max_poll_records=500,               # 单次 poll 最大条数
    fetch_max_bytes=52428800,           # 单次 fetch 最大字节(50MB)
    value_deserializer=...,             # 反序列化函数
    **KAFKA_SASL_CONFIG,                # SASL 认证配置
)

总结

问题根因修复
50台全绑分区0assign 模式没传 PARTITION_ID改用构造函数传 topic
Rebalance 风暴session_timeout 太短(10s)加大到 60s
不消费历史消息auto_offset_reset=latest改为 earliest
subscribe() 不稳定云 Kafka 兼容性问题构造函数直接传 topic

这几个坑单独看都不难,但组合在一起排查起来非常痛苦。希望这篇文章能帮到遇到类似问题的同学。