gobox中的consumer处理框架

我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架

consumer处理架构图

gobox-consumer.png

重要的对象

IMessage

定义每条消息

type IMessage interface {
    Body() []byte
}

ConsumerHandleFunc

consumer中从队列收到每条消息后,调用这个方法

type ConsumerHandleFunc func(message IMessage) error

IConsumer

定义消费者行为

type IConsumer interface {
    SetHandleFunc(hf ConsumerHandleFunc)
    Start()
    Stop()
}

NewWorkerFunc

每个Worker的构造方法

type NewWorkerFunc func() IWorker

IWorker

定义Worker

type IWorker interface {
    SetWorkId(id int)
    SetLogger(logger golog.ILogger)

    Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}

LineProcessFunc

每条消息在Worker中的实际处理方法

type LineProcessFunc func(line []byte) error

BaseWorker

框架提供的一个简单基础Worker对象,组合这个对象后,只需要实现LineProcessFunc即可

type BaseWorker struct

Task

Task用于实现consumer的处理框架

使用示例

package main

import (
    "github.com/goinbox/goconsumer"

    "fmt"
    "strconv"
    "time"
)

// 这里实现Worker
type DemoWorker struct {
    *goconsumer.BaseWorker
}

func NewDemoWorker() goconsumer.IWorker {
    worker := &DemoWorker{goconsumer.NewBaseWorker()}
    worker.SetLineProcessFunc(worker.LineProcessFunc)

    return worker
}

func (d *DemoWorker) LineProcessFunc(line []byte) error {
    idStr := strconv.Itoa(d.Id)
    fmt.Println("wid:" + idStr + " process line:" + string(line))

    return nil
}

// 这里实现Message
type DemoMessage struct {
    body []byte
}

func (d *DemoMessage) Body() []byte {
    return d.body
}

// 这里实现一个简单的Consumer,模拟从队列中获得100条消息
type DemoConsumer struct {
    hf goconsumer.ConsumerHandleFunc
}

func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
    d.hf = hf
}

func (d *DemoConsumer) Start() {
    for i := 0; i < 100; i++ {
        str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
        d.hf(&DemoMessage{[]byte(str)})
    }

    time.Sleep(time.Second * 1)
}

func (d *DemoConsumer) Stop() {

}


// 执行Task任务,调用consumer处理框架
func main() {
    task := goconsumer.NewTask("Demo")
    consumer := new(DemoConsumer)

    task.SetConsumer(consumer).
        SetWorker(10, NewDemoWorker).
        Start()
}

欢迎大家使用,使用中有遇到问题随时反馈,我们会尽快响应,谢谢!

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,678评论 19 139
  • 大数据技术框架 1. 简介 2. Hadoop框架2.1. Hadoop-MapReduce2.1.1. 简介:2...
    sunTengSt阅读 12,398评论 1 77
  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 30,271评论 8 265
  • 从我在父母的胁迫下出生那刻起, 便奋不顾身(其实是身不由己)的扎进了时间的牢笼。 小时候想着追风, 现在想来,莫不...
    光皂阅读 420评论 0 1
  • 今日下大雨,但是我的公益剪纸课还是如期举行。 我的两个大弟子,志诚和若愚都做了我的志愿者,给小朋友当小老师...
    从尔阅读 527评论 0 0

友情链接更多精彩内容