golang基于标签体系的发布订阅机制实现

发布订阅

在程序开发过程中,发布订阅模式是常见的一种架构设计,通过一个消息代理,可以完美解耦各系统服务之间的依赖关系。
在一些系统内部,也经常需要通过发布订阅这种模式,来降低各个组件的复杂性和依赖性,增加可拓展性。

但工作中用的更多的是系统服务级别的解耦与拆分,如引入rabbitmq、kafka等。在程序设计中使用的反而较少。

接下来主要介绍下golang实现的,基于标签体系的发布订阅机制。

gonal

先来看下实例

复制以下代码,然后执行go mod vendor下载依赖,运行。

package main

import (
    "context"
    "fmt"
    "github.com/czasg/gonal"
    "time"
)

func worker1(ctx context.Context, payload gonal.Payload) {
    fmt.Println("触发worker1", payload.Label)
}

func worker2(ctx context.Context, payload gonal.Payload) {
    fmt.Println("触发worker2", payload.Label)
}

func main() {
    // 绑定任务
    gonal.Bind(worker1, gonal.Label{
        "meta.type": "worker",
        "meta.name": "worker1",
    })
    gonal.Bind(worker2, gonal.Label{
        "meta.type": "worker",
        "meta.name": "worker2",
    })

    // 循环触发任务
    count := 0
    for {
        for _, label := range []gonal.Label{
            {"meta.type": "worker"},  // 该标签触发两个函数
            {"meta.name": "worker1"}, // 该标签触发worker1
            {"meta.name": "worker2"}, // 该标签触发worker2
        } {
            count++
            countStr := fmt.Sprintf("%d", count)
            label["count"] = countStr
            _ = gonal.Notify(gonal.Payload{
                Label: label,
                Body:  []byte(countStr),
            })
            time.Sleep(time.Second * 5)
        }
    }
}

在代码中我们可以明显看到两个操作:
1、基于标签注册函数
2、基于标签发布数据

其内部依赖github.com/czasg/go-queue实现的FIFO内存队列,并实现了并发限制,有兴趣可以去看下源码实现

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容