版本 sarama v1.26.1
consumer实现代码:
package kafka_consumer
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
const (
HandleMsgStatusFailed = 0
HandleMsgStatusSucc = 1
HandleMsgStatusOther = 2
)
type Handler func(msg *sarama.ConsumerMessage)(country string, status int)
func init() {
sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)
}
type Consumer struct {
wg *sync.WaitGroup
Consumer sarama.ConsumerGroup
Handle Handler
stop bool
ready chan bool
Topics []string
}
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg := <- claim.Messages():
if msg != nil {
st := time.Now()
country, status := c.Handle(msg)
if status != HandleMsgStatusOther {
monitorStatus := true
if status == HandleMsgStatusFailed {
monitorStatus = false
}
if status == HandleMsgStatusSucc {
monitorStatus = true
}
monitor.AddMsgCount(msg.Topic, &country, &monitorStatus)
monitor.AddMsgLatency(msg.Topic, &country, st)
logger.Info(fmt.Sprintf("topic=%s,partition=%d,offset=%d|%d|msg=%v", msg.Topic, msg.Partition, msg.Offset, time.Since(st)/time.Millisecond, msg.Value))
}
//c.Consumer.MarkOffset(msg, "")
session.MarkMessage(msg, "")
if c.stop {
logger.Info(fmt.Sprintf("topic %s partition %d exit 1!", msg.Topic, msg.Partition))
return nil
}
} else {
return nil
}
case <- session.Context().Done():
return nil
}
}
return nil
}
func NewConsumer(config config.KafkaGroup, handle Handler)(*Consumer, error){
c := &Consumer{
Consumer: nil,
Handle: handle,
stop: false,
ready: make(chan bool),
Topics:config.Topics,
}
brokers := config.Brokers
//topics := config.Topics
groupId := config.GroupId
kafkaConfig := sarama.NewConfig()
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Version = sarama.V2_1_0_0
kafkaConfig.Consumer.MaxProcessingTime = 15 * time.Second
kafkaConfig.Consumer.Group.Session.Timeout = 20 * time.Second
//kafkaConfig.Consumer.Group.Heartbeat.Interval = 6 * time.Second
//kafkaConfig.Group.Return.Notifications = true
//kafkaConfig.Group.Mode = cluster.ConsumerModePartitions
var err error
//fmt.Printf("brokers=%v,groupid=%s,kafkaconfig=%v", brokers, groupId, kafkaConfig)
c.Consumer, err = sarama.NewConsumerGroup(brokers, groupId, kafkaConfig)
if err != nil {
return nil, err
}
return c, nil
}
func (c *Consumer) HandleMsg() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
for err := range c.Consumer.Errors() {
logger.Info(fmt.Sprintf("kafka consume error: %s", err.Error()))
c.Consumer.Close()
}
}()
topics := c.Topics
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := c.Consumer.Consume(ctx, topics, c); err != nil {
logger.Info(fmt.Sprintf("error occur from consumer: %v", err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
logger.Info(fmt.Sprintf("consumer exit %v", ctx.Err()))
return
}
logger.Info(fmt.Sprintf("reconnect kafka %v", topics))
c.ready = make(chan bool)
}
}()
<-c.ready
logger.Info(fmt.Sprintf("consumer up and listening topics %v!", topics))
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("consumer terminating: context cancelled"))
case <-sigterm:
logger.Info(fmt.Sprintf("consumer terminating: via signal"))
}
cancel()
wg.Wait()
if err := c.Consumer.Close(); err != nil {
logger.Info(fmt.Sprintf("failed to close consumer client: %v", err))
}
}
现象
在consumer运行一段时间后, 部分partition的lag变大. 排查日志发现, 这部分partition的消费停止, 容器内日志如下:
2021/06/22 12:09:48 consumer/broker/4 disconnecting due to error processing FetchRequest: write tcp 10.129.27.46:40112->10.129.22.200:9092: write: broken pipe
2021/06/22 11:58:11 consumer/broker/4 disconnecting due to error processing FetchRequest: write tcp 10.129.40.4:39424->10.129.22.200:9092: write: broken pipe
2021/06/22 11:47:09 consumer/broker/1 disconnecting due to error processing FetchRequest: EOF</pre>
在这样的报错后, 该broker对应的partition将不再被消费. 问题偶发出现.
诊断
方法
- 增加debug日志
- 在test上重现问题 → test上无法重现 → 在liveish上重现问题, 当每个任务耗时为1s左右时, 问题能稳定复现且一直持续直到容器重启
- 调参 → 尝试修改了
MaxProcessingTime
,Session.Timeout
,Fetch.Max
等核心参数, 但依然没有缓解问题 - 阅读源码
源码简介
interface关系和职责
image.png
流程图
image.png
在poll消息遇到网络问题时, 将会触发trigger, 让partitionConsumer解除现有的broker订阅, 重连broker, 然后在broker上重新订阅partitionConsumer
**问题在于这里的订阅是串行的. **
**每次订阅一部分partitionConsumer, 然后立即开始poll这些partition的消息. 在这些消息最终处理完成后, 才会发起下一批次的partitionConsumer订阅. 如果poll获取的消息量大且消息处理慢, 则会导致一段时间内部分partition没有消费. **
解决方案
通过限制每次poll消息的量, 来避免长时间的等待订阅.
这里不能直接设置Fetch.Max, 而需要设置Fetch.Default. 因为在订阅完后第一次poll消息时, 仅使用Fetch.Default来限制大小, 在第一次之后才会使用Fetch.Max来限制大小.
所以, Fetch.Default / 单条消息的大小 * 每条消息的平均处理时间 <= 5min (5分钟是kafka侧每次poll之间的间隔时间)