Kafka consumer频繁reblance

转载 李亚飞 大佬的文章:https://www.lyafei.com/

其实文章名可以叫 记一次线上 Kafka 问题排查,但觉得稀松平常,弄些术词显得硬核点,hhh,言归正传,线上一个 Go 服务是一组 Kafka 的消费者,在运行了很多天之后,Kafka 数据突然积压了,查看 Kafka 服务正常,查看 Go 服务也运行正常,进到 Go 服务容器内部查看日志,发现消费者的 Go 服务频繁 rebalance 并且大概率返回失败。因为我们所使用的 Kafka 分配的分区为3,所以 Go 服务 一个 Deployment 三个 Pod。
错误信息如下:

kafka server: The provided member is not known in the current generation
Request was for a topic or partition that does not exist on this broker

有时候日志里还会伴随着

i/o timeout

甚至还触发了 阿里云的 K8s 运营报警,在代码中加了 errors 和 notifications 日志,发现每次错误都伴随着 rebalance。

一开始以为是超时时间过短导致,调大了连接超时和读写超时的时间,但是问题没有得到解决。

又以为是我们 Go 服务处理业务逻辑问题,性能不足,导致处理时间过长,以至于 Kafka Server 认为 Client 死掉了,然后进行rebalance 导致的,于是将每条获取到的 message 放到 channel 中,弄了一个线程池去消费 channel 来解决问题,但是问题仍然没有解决。 阅读了 sarama 的 heartbeat 机制,发现每个 consumer 都有单独的 goroutine 每 3 秒发送一次心跳。因此这个处理时间应该只会导致消费速度下降,不会导致 rebalance。

于是只好另外启动了一个消费者,指定了另外一个 group id,来做测试,在消费过程中,发现并未发生 rebalance。这时我就裂开了,懵逼了。得亏我最近查文档技能点满了,终于翻到一篇文章看到了这个问题的解决方案,全文英语,我就不贴出来了,简单概括讲下:

Kafka 不同 Topic 的 consumer 如果用的 groupId 名字一样的情况下,其中任意一个 Topic 的 consumer 重新上下线都会造成剩余所有的 consumer 产生 reblance 行为,即使大家不是同一个 Topic,这主要是由于 Kafka 官方支持一个 consumer 同时消费多个 Topic 的情况,所以在 zk 上一个 consumer 出现问题后,zk 是直接把 group 下所有 consumer 都通知一遍,这个与以前观念里认为 group 从属于某一个 Topic 的概念完全不同。

而我之前出问题的 Go 服务所使用的的消费组 groupId 在不同 Topic 下都有多个消费者。为了验证这个问题,我修改了我的消费组的 groupId,加了后缀与其它消费者区分开,果然之前频繁 reblance 再也没有发生过 reblance了。

  • 0
    点赞
  • 7
    收藏
    觉得还不错? 一键收藏
  • 0
    评论
要创建一个Kafka Consumer,需要执行以下步骤: 1. 导入所需的 Kafka 相关库,如 kafka-clients 和 slf4j。 2. 配置 Consumer 的属性,例如 Kafka 服务器的地址、Consumer 组 ID、是否自动提交偏移量等。 3. 创建一个 KafkaConsumer 实例,将上一步中的属性传递给它。 4. 订阅一个或多个主题,使用 KafkaConsumer 的 subscribe() 方法。 5. 循环从 Consumer 中拉取消息,使用 KafkaConsumer 的 poll() 方法。 6. 处理从 Kafka 中拉取的消息。 下面是一个基本的 Kafka Consumer 的示例代码: ```java import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class MyKafkaConsumer { public static void main(String[] args) { // 配置 Consumer 的属性 Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "my-group"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", StringDeserializer.class.getName()); props.setProperty("value.deserializer", StringDeserializer.class.getName()); // 创建 KafkaConsumer 实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅一个主题 consumer.subscribe(Collections.singletonList("my-topic")); // 循环从 Consumer 中拉取消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); // 处理从 Kafka 中拉取的消息 for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } } ``` 注意,这只是一个基本的示例,实际使用中还需要考虑更多的细节和配置。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值