Kai - Golang实现的目标检测云服务

YOLO/Darknet是目前比较流行的Object Detection算法(后面统一称为Darknet),在GPU上的表现不但速度快而且准确率很高。但是使用起来不方便,只提供了命令行接口和简单的Python接口。所以我想用RESTful来实现一个云端的Darknet服务kai

选择用Go的原因不是考虑并发,而是goroutine之间的同步能方便的处理,适合实现Pipeline的功能。问题来了,Darknet是c语言实现的,那Go必须得用cgo进行封装,才能调用c函数。目标是为了实现三个基本功能:1. 图片检测 2. 视频检测 3. 摄像头检测。为了方便使用我修改了Darknet的部分代码,然后重新定义下面几个函数:

// Set a gpu device
void set_gpu(int gpu);

// Recognize a image
void image_detector(char *datacfg, char *cfgfile, char *weightfile, char *filename,
    float thresh, float hier_thresh, char *outfile);

// Recognize a video
void video_detector(char *datacfg, char *cfgfile, char *weightfile, char *filename,
    float thresh, float hier_thresh, char *outfile);

// Recognize a camera stream
void camera_detector(char *datacfg, char *cfgfile, char *weightfile, int camindex,
        float thresh, float hier_thresh, char *outpath);

有了这几个函数,就好办了,下面用cgo导入相应的库和头文件即可:

// #cgo pkg-config: opencv
// #cgo linux LDFLAGS: -ldarknet -lm -L/usr/local/cuda/lib64 -lcuda -lcudart -lcublas -lcurand -lcudnn
// #cgo darwin LDFLAGS: -ldarknet
// #include "yolo.h"
import "C"

// SetGPU set a gpu device you want
func SetGPU(gpu int) {
    C.set_gpu(C.int(gpu))
}

// ImageDetector recognize a image
func ImageDetector(dc, cf, wf, fn string, t, ht float64, of ...string) {
    ...
}

// VideoDetector recognize a video
func VideoDetector(dc, cf, wf, fn string, t, ht float64, of ...string) {
    ...
}

// CameraDetector recognize a camera stream
func CameraDetector(dc, cf, wf string, i int, t, ht float64, of ...string) {
    ...
}

这样对Darknet的封装go-yolo就完成了。

下面进入主题,介绍一下kai的实现。

kai的设计目标如下:

  • 后端基于Darknet(不支持训练)
  • 提供RESTful接口进行图片和视频的检测
  • 支持Amazon S3下载和上传
  • 支持Ftp下载和上传
  • 支持检测结果持久化到MongoDB

架构图是这样的


Kai
Kai

这里重点介绍一下Kai的Pipeline机制,这里的Pipeline包括下载(Download),检测(Yolo)和上(Upload)传这一系列流程。
先上个图:

Pipeline
Pipeline

这里的难点在于下载(Download),检测(Yolo)和上传(Upload)这三个步骤可以配置不同的Goroutine数量,而这三步之间是一个同步操作。

  1. 首先需要定义3个buffered channel来进行同步
// KaiServer represents the server for processing all job requests
type KaiServer struct {
    net.Listener
    logger        *logging.Logger
    config        types.ServerConfig
    listenAddr    string
    listenNetwork string
    router        *Router
    server        *http.Server
    db            db.Storage
    // jobDownBuff is the buffered channel for job downloading
    jobDownBuff chan types.Job
    // jobDownBuff is the buffered channel for job todo
    jobTodoBuff chan types.Job
    // jobDownBuff is the buffered channel for job done
    jobDoneBuff chan types.Job
}
  1. Pipeline的执行流程如下
// Pipeline contains downloading, processing and uploading a job
func Pipeline(logger *logging.Logger, config types.ServerConfig, dbInstance db.Storage, jobDownBuff chan types.Job,
    jobTodoBuff chan types.Job, jobDoneBuff chan types.Job, job types.Job) {
    logger.Infof("pipeline-job %+v", job)

    // download a job
    setupAndDownloadJob(logger, config.System, dbInstance, job, jobDownBuff)

    // jobDownBuff -> jobTodoBuff -> jobDoneBuff
    yoloJob(logger, config, dbInstance, jobDownBuff, jobTodoBuff, jobDoneBuff)

    // upload a job
    uploadJob(logger, dbInstance, jobDoneBuff)
}
  1. 下载(Download)
// setupAndDownloadJob setup and download jobs into jobDownBuff
func setupAndDownloadJob(logger *logging.Logger, config types.SystemConfig,
    dbInstance db.Storage, job types.Job, jobDownBuff chan<- types.Job) {

    go func() {
        logger.Infof("start setup and download a job: %+v", job)
        newJob, err := SetupJob(logger, job.ID, dbInstance, config)
        job = *newJob
        if err != nil {
            logger.Error("setup-job failed", err)
            return
        }

        downloadFunc := downloaders.GetDownloadFunc(job.Source)
        if err := downloadFunc(logger, config, dbInstance, job.ID); err != nil {
            logger.Error("download failed", err)
            job.Status = types.JobError
            job.Details = err.Error()
            dbInstance.UpdateJob(job.ID, job)
            return
        }

        jobDownBuff <- job
    }()
}
  1. 检测(Yolo)
func yoloJob(logger *logging.Logger, config types.ServerConfig, dbInstance db.Storage,
    jobDownBuff <-chan types.Job, jobTodoBuff chan types.Job, jobDoneBuff chan types.Job) {

    go func() {
        job, ok := <-jobDownBuff
        if !ok {
            logger.Info("job download buffer is closed")
            return
        }
        logger.Infof("start a yolo job: %+v", job)
        // limit the number of job in the jobTodoBuff
        jobTodoBuff <- job
        jobTodo, ok := <-jobTodoBuff
        if !ok {
            logger.Info("job todo buffer is closed")
            return
        }

        nGpu := config.System.NGpu
        t := yolo.NewTask(config.Yolo, jobTodo.Media.Cate, nGpu, jobTodo.LocalSource, jobTodo.LocalDestination)
        logger.Debugf("yolo task: %+v", *t)
        yolo.StartTask(t, logger, dbInstance, jobTodo.ID)
        jobDoneBuff <- job
    }()
}
  1. 上传(Upload)
func uploadJob(logger *logging.Logger, dbInstance db.Storage, jobDoneBuff <-chan types.Job) {
    go func() {
        jobDone, ok := <-jobDoneBuff
        if !ok {
            logger.Info("job done buffer is closed")
            return
        }
        logger.Infof("start a upload job: %+v", jobDone)

        uploadFunc := uploaders.GetUploadFunc(jobDone.Destination)
        if err := uploadFunc(logger, dbInstance, jobDone.ID); err != nil {
            logger.Error("upload failed", err)
            jobDone.Status = types.JobError
            jobDone.Details = err.Error()
            dbInstance.UpdateJob(jobDone.ID, jobDone)
            return
        }

        logger.Info("erasing temporary files")
        if err := CleanSwap(dbInstance, jobDone.ID); err != nil {
            logger.Error("erasing temporary files failed", err)
        }

        jobDone.Status = types.JobFinished
        dbInstance.UpdateJob(jobDone.ID, jobDone)

        logger.Infof("end a job: %+v", jobDone)
    }()
}

到此,这个项目主要机制都已经介绍完了,如果大家有兴趣的可以去点击下面的项目主页。

项目链接:
go-yolo: https://github.com/ZanLabs/go-yolo
kai: https://github.com/ZanLabs/kai

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容