事情的起因是因为我看到了这样一张图:
Write Throughput Exceeded (Count / Request) — Average
这张图一开始我还不怎么重视,因为它的Y轴数字才几十甚至个数,这对于每小时上千万的请求量来说,完全不能引起我的注意力啊。
直到后来发生的一些事,让我对几乎所有系统监控都警觉起来。
这张图的意思,是 Producer 写入 Kinesis 时超出预设容量的部分,单位是 个/每次请求,这个说明什么呢,说明数据的写入太快,超过预设容量了。
但是我算了一下,当前 Kinesis 的 Stream,我是开了 4 个 Shard 的,理论上来说,它的写入的吞吐量应该是 4 * 1000 = 4000/s,然而,我们目前每小时的流量才 8,000,000 ,算下来应该是 8,000,000/60/60 = 2222.22/s,远小于 Kinesis 的吞吐量最大值,按理来说不应该出现超量的情况。
所以我去研究了一下代码,这一研究就研究了好几天,经过不断地测试,终于发现问题出在批量写入后的判断逻辑上。
原先我们的逻辑是这样的:
for i := 0; i < maxRetry; i++ {
_, err = kinesis.PutRecords(input)
if err == nil {
return
}
time.Sleep(backoff(i))
}
可以看到,如果出错的话,我们是会进行重新写入的。但是经过我的反复测试,我发现其实 WriteProvisionedThroughputExceeded 这个错误并不会出现在 err 里面,而是出现在 PutRecords() 返回的第一个参数里面。
PutRecords() 会返回一个 output 和 err,其中 output 的定义是:
// PutRecords results.
*type*PutRecordsOutput *struct*{
_ *struct*{} `type:"structure"`
// The encryption type used on the records. This parameter can be one of the
// following values:
//
// * NONE: Do not encrypt the records.
//
// * KMS: Use server-side encryption on the records using a customer-managed
// AWS KMS key.
EncryptionType *string `type:"string" enum:"EncryptionType"`
// The number of unsuccessfully processed records in a PutRecords request.
FailedRecordCount *int64 `min:"1" type:"integer"`
// An array of successfully and unsuccessfully processed record results, correlated
// with the request by natural ordering. A record that is successfully added
// to a stream includes SequenceNumber and ShardId in the result. A record that
// fails to be added to a stream includes ErrorCode and ErrorMessage in the
// result.
//
// Records is a required field
Records []*PutRecordsResultEntry `min:"1" type:"list" required:"true"`
}
其中,FailedRecordCount 会返回此次写入失败的 record 数量,Records 里面会返回所有 record 的状态。
我们要想判断是否 WriteProvisionedThroughputExceeded,首先要判断是否 FailedRecordCount > 0,其次,要遍历 Records,检查是否 Record.ErrorCode != nil ,如果是,表示该记录写入失败,需要放回重试队列中。