Go:每分钟处理百万请求

【译文】原文地址

问题

从事匿名遥测和分析系统,我们的目标是能够处理来自大量客户端的POST请求。我们的web服务将接收JSON文档内容包括很多的负载需要发送到亚马逊S3存储,为了后续使用map-reduce来处理这些数据。

传统方式我们将创建worker-tier架构,使用包含如下中间件:

  • SideKiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk worker tier
  • RabbitMQ
    搭建两个集群,一个部署前端另一个用于workers,这样就可以通过扩展来应对大量后端任务。自开始,团队就知道应该使用Go,因为在讨论的过程中就看出来可能是个很大流量的系统。作者使用go大概两年,也开发了一些系统但是没有处理过这种大流量的。

起初我们创建一些struct来定义web服务POST请求接收的负载和上传数据到S3桶的方法。

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // 待实现
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

简单的Go协程方法

开始我们使用一个很简单的POST handler来实现,仅将任务放进一个简单goroutine中来并行处理。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 将body读取到字符串并使用json解码
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    //迭代每个payload并逐个上传到S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

对于流量不是很大的情况,可以应对大多数人请求,但是在大规模情况下很快上面的方法就被证明不是很好了。我们预期有很多的请求,但和我们部署第一个版本到生产环境中所看到的不一样。我们完全低估了流量。以上方法有很多不足地方。无法控制goroutine的数量。当达到每分钟1百万POST请求的时候,代码直接瘫痪了。

再次优化

需要另找方法。一开始我们就讨论如何保持处理请求生命周期非常短,并在后台处理。当然这个在Ruby中是必须做的,否则将阻塞所有可用的worker。然后我们就使用常规解决方案来做,比如Resque、Sidekiq、SQS等。很多处理这种问题的方法。

因此第二个版本通过创建带缓冲的channel,这样就可以缓存一些jobs,并逐步上传到S3,而且可以控制缓存队列的长度,有足够内存也能够存放这些job。我们认为将job存放到channel队列中是可以的。

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}

然后消费队列处理jobs,使用类似如下方式:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

老实说,我对我们所想的一点底都没有。这个方法并没有让我们轻松,我们用缓存来应对并发只是延缓了问题的爆发。同步处理机制,每次上传一个负载到S3,因为接受请求的速度太快,远比一个处理器上传到S3点速度快,导致缓存很快就挤满了,导致后面来的请求直接阻塞。我们只是在回避这个问题,直到倒计时我们的系统最终死亡。在我们部署了这个有缺陷的版本之后,我们的延迟率在几分钟内以恒定的速率不断增加。


更好的方法

我们决定在使用go channel时利用一个通用的模式,创建一个两层的channel系统,一个用来存放jobs另一个用于控制并发处理job队列的workers的数量。为了保持一定层度的并发上传数据到S3,一方面不会使系统拖垮,另一方面不会出现连接S3错误。因此我们选择创建一个job/worker模式。这在java、C#等中经常使用。考虑以golang方式实现可以通过使用channels来代替worker线程池。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

//Start方法通过循环监听任务请求和停止信号。
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
                        //注册当前worker到worker队列
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 接收到一个工作请求
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // 接收到停止工作信号
                return
            }
        }
    }()
}

//Stop方法通知worker停止监听工作请求
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

修改web请求处理函数,创建一个Job结构体实例并将Job实例发送到JobQenue channel供worker处理。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    //将body读取到字符串并使用json解码
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // 迭代每个payload并逐个上传到S3
    for _, payload := range content.Payloads {

       //创建Job实例
        work := Job{Payload: payload}

       //将worker发送到队列
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在web服务器初始化的时候创建一个Dispather和调用Run()来创建workers池,并开始监听JobQueue中jobs。

dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

以下是dispatcher实现:

type Dispatcher struct {
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.WorkerPool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // a job request has been received
            go func(job Job) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool

                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}

注意我们设置了workers的实例最大数量,并添加到worker池中。因为我们项目在docker环境中使用了亚马逊Elasticbeanstalk,所以在生产环境下我们总是遵循12-factor原则来配置我们系统,通过环境变量的方式读取配置。这种方式我们可以控制workers的数量和JobQueue的长度,因此可以快速调整这些值不需要重新部署集群。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

中间结果

在部署上面的优化方案之后,很快我们就发现延时下降到可接受范围之内,并且可以处理波动很大的请求量。



在几分钟后当弹性负载均衡起作用后,看到ElasticBeanstalk应用服务处理近1百度请求每分钟。经常在早上几小时流量还能飚升到超过一百万每分钟。

新代码部署后,服务器的数量很快从100多个降20个。


总结

在这里使用了简单的方法。本来我们可以设计复杂的系统包含很多的队列、后台workers、复杂的部署,但是我们决定使用Elasticbeanstalk自动扩展能力和go提供的高效简单的并发方法。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容