Go语言使用NSQ消息队列

1. 概述

NSQ 是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件

相关描述如下:

NSQ是一个实时分布式消息传递平台,旨在大规模运行,每天处理数十亿条消息。 它促进了没有单点故障的分布式和分散式拓扑,从而实现了容错能力和高可用性,并提供了可靠的消息传递保证。 查看功能和保证。 从操作上讲,NSQ易于配置和部署(所有参数均在命令行上指定,并且编译的二进制文件不具有运行时相关性)。 为了获得最大的灵活性,它与数据格式无关(消息可以是JSON,MsgPack,协议缓冲区或其他任何东西)。 官方提供了Go和Python库(以及许多其他客户端库),并且,如果您有兴趣构建自己的库,则有一个协议规范。

NSQ的特点

  • 支持水平横向拓展(无缝添加更多节点到集群中)
  • 部署配置容易,自带集群管理界面(nsqadmin)
  • 提倡分布式拓扑,减少单点故障,提高容错
  • 低延迟的消息传递
  • 可靠的消交付保障保障
    • 默认中消息都在内存中, nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。
    • 消息最少被投递一次

比较知名和常用的消息处理系统还有

RabbitMQ

KafKa

2. 基础应用场景

我们知道一般的消息队列(Message Queue) 常用的场景有系统解耦 异步处理 流量削峰 消息通信

3. 相关文档

  1. 项目地址 : https://github.com/nsqio/nsq
  2. 项目文档 英文: https://nsq.io/overview/design.html
  3. 下载地址: https://nsq.io/deployment/installing.html
  4. 客户端下载地址 : https://nsq.io/clients/client_libraries.html

4.安装操作

根据自己的操作平台下载解压即可

  • 根据自己的操作系统下载对应的压缩包文件
  • 解压压缩文件
  • 进入解压后 bin 目录中

bin 目录中我们能看到如下文件

-rwxr-xr-x 1 captain 197121 5515776 8月  28 13:46 nsq_stat.exe*
-rwxr-xr-x 1 captain 197121 5823488 8月  28 13:46 nsq_tail.exe*
-rwxr-xr-x 1 captain 197121 5997568 8月  28 13:46 nsq_to_file.exe*
-rwxr-xr-x 1 captain 197121 5923840 8月  28 13:46 nsq_to_http.exe*
-rwxr-xr-x 1 captain 197121 5903872 8月  28 13:46 nsq_to_nsq.exe*
-rwxr-xr-x 1 captain 197121 8787968 8月  28 13:46 nsqadmin.exe*
-rwxr-xr-x 1 captain 197121 9108992 8月  28 13:46 nsqd.exe*
-rwxr-xr-x 1 captain 197121 8384000 8月  28 13:46 nsqlookupd.exe*
-rwxr-xr-x 1 captain 197121 5639680 8月  28 13:46 to_nsq.exe*

5. NSQ服务端基础组件介绍

5.1 nsqd

nsqd是一个守护进程负责接收,排队,消息传递 到客户端。 它可以独立运行,但通常由nsqlookupd实例的群集中配置(在这种情况下,它将能声明topics和发现channel)。 它侦听两个TCP端口,一个侦听客户端,另一个侦听HTTP API。 它可以选择在第三个端口上侦听HTTPS。

5.2 nsqlookupd

nsqlookupd 是管理拓扑信息的守护程序。 客户端查询nsqlookupd以发现特定 topicnsqd 生产者和 nsqd 节点广播topicchannel信息。
有两个接口:

nsqd用于广播的TCP接口

客户端(nsqadmin)执行发现和管理操作的HTTP接口

5.3 nsqadmin

nsqadmin 是一套 WEB管理界面,用来汇集集群的实时统计,并执行不同的管理任务。

重点提示:

NSQ还有许多功能组件,我们只介绍这三个(nsqd nsqlookupd nsqadmin)最常用和主要的

NSQ的所有组件都可以通过参数 -- help 查看相关配置

nsqdnsqlookupd 都有对应的http API ,需要使用的时候查看文档即可

6.操作NSQ

6.1 安装客户端

根据不同的开发语言选择不同的客户端

我们是使用Golang操作所以采用NSQ的官方提供客户端 go-nsq

go get -u github.com/nsqio/go-nsq

6.1 单机启动nsqd

默认启动的nsqd 监听 HTTP对应的4151端口和TCP对应的4150端口

$ ./nsqd
[nsqd] 2019/11/10 13:41:29.575014 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 13:41:29.593002 INFO: ID: 825
[nsqd] 2019/11/10 13:41:29.597000 INFO: TOPIC(topic_demo): created
[nsqd] 2019/11/10 13:41:29.599998 INFO: TOPIC(topic_demo): new channel(aa)
[nsqd] 2019/11/10 13:41:29.599998 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 13:41:29.644973 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/11/10 13:41:29.644973 INFO: TCP: listening on [::]:4150

我们同样可以指定端口

$ ./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081"
[nsqd] 2019/11/10 14:05:40.726849 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 14:05:40.745838 INFO: ID: 825
[nsqd] 2019/11/10 14:05:40.747836 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 14:05:40.788814 INFO: TCP: listening on [::]:8081
[nsqd] 2019/11/10 14:05:40.788814 INFO: HTTP: listening on [::]:8080

这样我们就启动了一个nsqd 的实例

在NSQ中有两个非常重要的概念 topicChannel

我们看一下文档中的描述:

每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。

topicchannel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channelchannel是通过订阅指定的channel在第一次使用时创建的。

topicchannel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。

channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:

topic-channel.gif

总而言之,消息是从topic->channel多播的(每个channel都接收该topic的所有消息的副本),但从channel-> 消息消费者 均匀分发(每个消费者都接收该频道的一部分消息)。

6.1.1 单NSQ的使用

编写一个消息生产者
nsq_single_product.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "time"
)
func main() {
    nsqAddr := "127.0.0.1:8081"
    conf :=nsq.NewConfig()
    p ,err := nsq.NewProducer(nsqAddr,conf)
    if err != nil {
        fmt.Println(err)
        return
    }
    for  {
        message := "message :"+ time.Now().Format("2006-01-02 15:04:05")
        fmt.Println(message)
        // 发送消息
        p.Publish("topic-demo1",[]byte(message))
        time.Sleep(time.Second)
    }

}

编写一个消息消费者

nsq_single_consumer.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

type NewHandler struct{}

func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
}
func MyConsumers(topic, channel, addr string) {
    conf := nsq.NewConfig()
    new_consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {

    }
    // 接收消息
    new_handler := &NewHandler{}
    new_consumer.AddHandler(new_handler)
    err = new_consumer.ConnectToNSQD(addr)
    if err != nil {

    }
}
func main() {
    addr := "127.0.0.1:8081"
    go MyConsumers("topic-demo1", "channel-aa", addr)
    // 模拟多个从多个channel去消息
    go MyConsumers("topic-demo1", "channel-bb", addr)
    select {}
}

6.1.2 通过nsqadmin查看

启动nsqadmin

nsqadmin 的web界面默认监听了 4171端口

$ ./nsqadmin --nsqd-http-address="127.0.0.1:8080"
[nsqadmin] 2019/11/10 16:06:15.842033 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
[nsqadmin] 2019/11/10 16:06:15.858026 INFO: HTTP: listening on [::]:4171

我们在地址栏中输如

http://127.0.0.1:4171/

就能看看管理界面

nsqadmin
nsqadmin
6.1.3 NSQ的单点结构
nsq.png

6.3 NSQ集群

6.3.1 启动NSQ各组件

构建一个NSQ的基础拓扑结构

我们可以简单的说nsqlookupd 是用来管理nsqd实例节点的

第一步
启动nsqlookupd

启动的nsqlookupd 采用了默认配置 通过参数 --help 查看配置项

$ ./nsqlookupd
[nsqlookupd] 2019/11/10 16:40:55.968588 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
[nsqlookupd] 2019/11/10 16:40:55.983580 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/11/10 16:40:55.984579 INFO: TCP: listening on [::]:4160

第二步

添加nsqd 实例

与前面的启动不同,需要带上参数 -lookupd-tcp-address

添加第一个实例

./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081" -lookupd-tcp-address="127.0.0.1:4160"

添加第二个实例

 ./nsqd -http-address="0.0.0.0:8090" -tcp-address="0.0.0.0:8091" -lookupd-tcp-address="127.0.0.1:4160"

第三步

启动nsqadmin

与前面的也不同了需要带上参数 -lookupd-http-address

$ ./nsqadmin -lookupd-http-address="127.0.0.1:4161"

在浏览器中访问nsqadmin

nsqadmin-nodes
nsqadmin-lookup
6.3.2 NSQ的拓扑结构
nsqlookupd
  1. 在集群模式中,消息生产方发送消息给任意一个nsqd 实例都不影响
  2. 消息的消费者需要通过nsqlookupd 查询nsqd的地址后才能获取消息
  3. 增加nsqd 节点完全不影响其他的节点
6.3.3 Golang使用NSQ代码示例

消息生产者

nsq_cluster_product.go

package main

import (
    "bufio"
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "strings"
)

var pro *nsq.Producer

func NewPro(addr string) (err error) {
    conf := nsq.NewConfig()
    pro, err = nsq.NewProducer(addr, conf)
    if err != nil {
        log.Println(err)
        return err
    }
    return nil
}
func main() {
    nsqAddr := "127.0.0.1:8091"
    err := NewPro(nsqAddr)
    if err != nil {
        fmt.Println(err)
        return
    }else{
        fmt.Println("connect 127.0.0.1:8091 success")
    }
    // 读取标准输入
    reader := bufio.NewReader(os.Stdin)
    for {
        // 读取所有内容直到遇见回车(\n)
        data, err := reader.ReadString('\n')
        if err != nil {
            fmt.Println("read data from stdin is field : ", err)
            continue
        }
        // 当输入q的时候退出
        data = strings.TrimSpace(data)
        if strings.ToUpper(data) == "Q" {
            break
        }
        err = pro.Publish("topic-demo1", []byte(data))
        if err != nil {
            fmt.Println("nsq publish is field ", err)
            continue
        }
    }
    fmt.Println("exit !")
}

消息消费者

nsq_cluster_consumer.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

type Handler struct{}

func (m *Handler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
}
func NewConsumers(t string, c string, addr string) error {
    conf := nsq.NewConfig()
    nc, err := nsq.NewConsumer(t, c, conf)
    if err != nil {
        fmt.Println("create consumer failed err ", err)
        return err
    }
    consumer := &Handler{}
    nc.AddHandler(consumer)
    // 连接nsqlookupd
    if err:= nc.ConnectToNSQLookupd(addr);err!=nil{
        fmt.Println("connect nsqlookupd failed ", err)
        return err
    }
    return nil
}
func main() {
    // 这是nsqlookupd的地址
    addr := "127.0.0.1:4161"
    err := NewConsumers("topic-demo1", "channel-aa", addr)
    if err != nil {
        fmt.Println("new nsq consumer failed", err)
        return
    }
    select {}
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容