Shopify/sarama的同步/异步producer

Shopify/sarama的producer有两种运行模式:

  1. 同步模式
    producer把消息发给kafka之后会等待结果返回。
  2. 异步模式
    producer把消息发给kafka之后不会等待结果返回。
  1. 同步模式
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    client, err := sarama.NewClient([]{"localhost:9092"}, config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewSyncProducerFromClient(client)
    if err != nil {
        log.Fatalf("unable to create kafka producer: %q", err)
    }
    defer producer.Close()

    text := fmt.Sprintf("message %08d", i)
    partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
    if err != nil {
        log.Fatalf("unable to produce message: %q", err)
    }
    ...

注意同步模式下,下面配置必须置上:
config.Producer.Return.Successes = true
否则运行报错:

2018/12/25 08:08:30 unable to create kafka producer: "kafka: 
invalid configuration (Producer.Return.Successes must be true to be used in a SyncProducer)"
  1. 异步模式

异步模式,顾名思义就是produce一个message之后不等待发送完成返回;这样调用者可以继续做其他的工作。

    config := sarama.NewConfig()
    // config.Producer.Return.Successes = true
    client, err := sarama.NewClient([]{"localhost:9092"}, config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewAsyncProducerFromClient
    if err != nil {
        log.Fatalf("unable to create kafka producer: %q", err)
    }
    defer producer.Close()

    text := fmt.Sprintf("message %08d", i)
    producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
    // wait response
    select {
            //case msg := <-producer.Successes():
            //    log.Printf("Produced message successes: [%s]\n",msg.Value)
            case err := <-producer.Errors():
                log.Println("Produced message failure: ", err)
            default:
                log.Println("Produced message default",)
    }
    ...

关于异步producer有一个地方取药注意的。

  1. 异步模式produce一个消息后,缺省并不会报告成功状态。
config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false,那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。当然可以加一个default分支绕过去,就不会挂住了:

select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default")
}
  1. 如果打开了Return.Successes配置,则上述代码段等同于同步方式
config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

从log可以看到,每发送一条消息,收到一条Return.Successes,类似于:

2018/12/25 08:51:51 Produced message: [message 00002537]
2018/12/25 08:51:51 Produced message successes: [message 00002537]
2018/12/25 08:51:51 Produced message: [message 00002538]
2018/12/25 08:51:51 Produced message successes: [message 00002538]
2018/12/25 08:51:51 Produced message: [message 00002539]
2018/12/25 08:51:51 Produced message successes: [message 00002539]
2018/12/25 08:51:51 Produced message: [message 00002540]
2018/12/25 08:51:51 Produced message successes: [message 00002540]
2018/12/25 08:51:51 Produced message: [message 00002541]
2018/12/25 08:51:51 Produced message successes: [message 00002541]
2018/12/25 08:51:51 Produced message: [message 00002542]
2018/12/25 08:51:51 Produced message successes: [message 00002542]
2018/12/25 08:51:51 Produced message: [message 00002543]
2018/12/25 08:51:51 Produced message successes: [message 00002543]
...

就像是同步produce一样的行为了。

  1. 如果打开了Return.Successes配置,而又没有producer.Successes()提取,那么Successes()这个chan消息会被写满。
config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    //case msg := <-producer.Successes():
    //    log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default",)
}

写满的结果就是不能再写入了,导致后面的Return.Successes消息丢失, 而且producer也会挂住,因为共享的buffer被占满了,大量的Return.Successes没有被消耗掉。

运行一段时间后:

2018/12/25 08:58:24 Reade to Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000608]

在produce第00000608个message的时候被挂住了,因为消息缓冲满了;这个缓冲的大小是可配的(可能是这个MaxRequestSize?),但是不管大小是多少,如果没有去提取Success消息最终都会被占满的。

结论就是说配置config.Producer.Return.Successes = true和操作<-producer.Successes()必须配套使用;配置成true,那么就要去读取Successes,如果配置成false,则不能去读取Successes。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容