sarama 停止消费问题

版本 sarama v1.26.1

consumer实现代码:

package kafka_consumer

import (
    "context"
    "fmt"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

const (
    HandleMsgStatusFailed = 0
    HandleMsgStatusSucc   = 1
    HandleMsgStatusOther  = 2
)
type Handler func(msg *sarama.ConsumerMessage)(country string, status int)

func init() {
    sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)
}

type Consumer struct {
    wg *sync.WaitGroup
    Consumer sarama.ConsumerGroup
    Handle  Handler
    stop bool
    ready chan bool
    Topics []string
}


func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(c.ready)
    return nil
}

func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for {
        select {
        case msg := <- claim.Messages():
            if msg != nil {
                st := time.Now()
                country, status := c.Handle(msg)
                if status != HandleMsgStatusOther {
                    monitorStatus := true
                    if status == HandleMsgStatusFailed {
                        monitorStatus = false
                    }
                    if status == HandleMsgStatusSucc {
                        monitorStatus = true
                    }
                    monitor.AddMsgCount(msg.Topic, &country, &monitorStatus)
                    monitor.AddMsgLatency(msg.Topic, &country, st)
                    logger.Info(fmt.Sprintf("topic=%s,partition=%d,offset=%d|%d|msg=%v", msg.Topic, msg.Partition, msg.Offset, time.Since(st)/time.Millisecond, msg.Value))
                }
                //c.Consumer.MarkOffset(msg, "")
                session.MarkMessage(msg, "")
                if c.stop {
                    logger.Info(fmt.Sprintf("topic %s partition %d exit 1!", msg.Topic, msg.Partition))
                    return nil
                }
            } else {
                return nil
            }
        case <- session.Context().Done():
            return nil
        }
    }
    return nil
}



func NewConsumer(config config.KafkaGroup, handle Handler)(*Consumer, error){
    c := &Consumer{
        Consumer: nil,
        Handle:   handle,
        stop:  false,
        ready: make(chan bool),
        Topics:config.Topics,
    }
    brokers := config.Brokers
    //topics := config.Topics
    groupId := config.GroupId

    kafkaConfig := sarama.NewConfig()
    kafkaConfig.Consumer.Return.Errors = true
    kafkaConfig.Version = sarama.V2_1_0_0
    kafkaConfig.Consumer.MaxProcessingTime = 15 * time.Second
    kafkaConfig.Consumer.Group.Session.Timeout = 20 * time.Second
    //kafkaConfig.Consumer.Group.Heartbeat.Interval = 6 * time.Second

    //kafkaConfig.Group.Return.Notifications = true
    //kafkaConfig.Group.Mode = cluster.ConsumerModePartitions

    var err error
    //fmt.Printf("brokers=%v,groupid=%s,kafkaconfig=%v", brokers, groupId, kafkaConfig)
    c.Consumer, err = sarama.NewConsumerGroup(brokers, groupId, kafkaConfig)
    if err != nil {
        return nil, err
    }
    return c, nil
}


func (c *Consumer) HandleMsg() {
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        for err := range c.Consumer.Errors() {
            logger.Info(fmt.Sprintf("kafka consume error: %s", err.Error()))
            c.Consumer.Close()
        }
    }()
    topics := c.Topics
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {

        defer wg.Done()
        for {
            if err := c.Consumer.Consume(ctx, topics, c); err != nil {
                logger.Info(fmt.Sprintf("error occur from consumer: %v", err))
            }

            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                logger.Info(fmt.Sprintf("consumer exit %v", ctx.Err()))
                return
            }
            logger.Info(fmt.Sprintf("reconnect kafka %v", topics))
            c.ready = make(chan bool)
        }
    }()
    <-c.ready

    logger.Info(fmt.Sprintf("consumer up and listening topics %v!", topics))

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        logger.Info(fmt.Sprintf("consumer terminating: context cancelled"))
    case <-sigterm:
        logger.Info(fmt.Sprintf("consumer terminating: via signal"))
    }
    cancel()
    wg.Wait()
    if err := c.Consumer.Close(); err != nil {
        logger.Info(fmt.Sprintf("failed to close consumer client: %v", err))
    }

}

现象

在consumer运行一段时间后, 部分partition的lag变大. 排查日志发现, 这部分partition的消费停止, 容器内日志如下:

2021/06/22 12:09:48 consumer/broker/4 disconnecting due to error processing FetchRequest: write tcp 10.129.27.46:40112->10.129.22.200:9092: write: broken pipe

2021/06/22 11:58:11 consumer/broker/4 disconnecting due to error processing FetchRequest: write tcp 10.129.40.4:39424->10.129.22.200:9092: write: broken pipe

2021/06/22 11:47:09 consumer/broker/1 disconnecting due to error processing FetchRequest: EOF</pre>

在这样的报错后, 该broker对应的partition将不再被消费. 问题偶发出现.

诊断

方法

  1. 增加debug日志
  2. 在test上重现问题 → test上无法重现 → 在liveish上重现问题, 当每个任务耗时为1s左右时, 问题能稳定复现且一直持续直到容器重启
  3. 调参 → 尝试修改了 MaxProcessingTime, Session.Timeout, Fetch.Max 等核心参数, 但依然没有缓解问题
  4. 阅读源码

源码简介

interface关系和职责

image.png

流程图

image.png

在poll消息遇到网络问题时, 将会触发trigger, 让partitionConsumer解除现有的broker订阅, 重连broker, 然后在broker上重新订阅partitionConsumer

**问题在于这里的订阅是串行的. **

**每次订阅一部分partitionConsumer, 然后立即开始poll这些partition的消息. 在这些消息最终处理完成后, 才会发起下一批次的partitionConsumer订阅. 如果poll获取的消息量大且消息处理慢, 则会导致一段时间内部分partition没有消费. **

解决方案

通过限制每次poll消息的量, 来避免长时间的等待订阅.

这里不能直接设置Fetch.Max, 而需要设置Fetch.Default. 因为在订阅完后第一次poll消息时, 仅使用Fetch.Default来限制大小, 在第一次之后才会使用Fetch.Max来限制大小.

所以, Fetch.Default / 单条消息的大小 * 每条消息的平均处理时间 <= 5min (5分钟是kafka侧每次poll之间的间隔时间)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容