golang 使用nsq消息队列

NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用Go和Python库。

部署

官网下载地址

安装步骤

# 下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 解压
tar -zxvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 启动服务
cd nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &

使用

1、创建一个test主题,并发送一个hello world消息

curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'

2、浏览器访问NSQ的管理界面: http://127.0.0.1:4171/

image.png

3 消费test主题的消息

$ ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
2019/03/13 11:09:49 INF    1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2019/03/13 11:09:49 INF    1 [test/nsq_to_file] (jinchunguang-TM1701:4150) connecting to nsqd
2019/03/13 11:09:49 INFO: opening /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
2019/03/13 11:09:49 syncing 1 records to disk

$ cat /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
hello world

客户端

生产者可使用PHP curl 直接处理,github有许多现成的客户端可以使用

<?php

$msg="最简单的发送消息方式!";
$url= "http://127.0.0.1:4151/pub?topic=test";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_POSTFIELDS, $msg);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
        'Content-Type: text/html; charset=utf-8',
        'Content-Length: ' . strlen($msg))
);
$output = curl_exec($ch);
if($output === FALSE ){
    echo "CURL Error:".curl_error($ch);
}

使用go来处理

代码目录结构如下(示例项目,通过gin封装的,学习使用)

image.png

nsq.go 简单封装nsq操作


package servers

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

// 默认配置
const HOST  = "127.0.0.1:4150"
const TOPIC_NAME  = "test"
const CHANNEL_NAME  = "test-channel"

// 启动Nsq
func NsqRun()  {
    Consumer()
}

// nsq发布消息
func Producer(msgBody string) {
    // 新建生产者
    p, err := nsq.NewProducer(HOST, nsq.NewConfig())
    if err != nil {
        panic(err)
    }
    // 发布消息
    if err := p.Publish(TOPIC_NAME, []byte(msgBody)); err != nil {
        panic(err)
    }
}


// nsq订阅消息
type ConsumerT struct{}

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
    fmt.Println(string(msg.Body))
    return nil
}

func Consumer() {
    c, err := nsq.NewConsumer(TOPIC_NAME, CHANNEL_NAME, nsq.NewConfig())   // 新建一个消费者
    if err != nil {
        panic(err)
    }
    c.AddHandler(&ConsumerT{})                                           // 添加消息处理
    if err := c.ConnectToNSQD(HOST); err != nil {            // 建立连接
        panic(err)
    }
}

main.go 项目入口文件

package main

import (
    "github.com/gin-gonic/gin"
    "wages_service/servers"
    "wages_service/tasks"
)

var GinEngine *gin.Engine

func main() {

    // 运行 task
    tasks.SyncDataRun()

    // 运行 nsq
    servers.NsqRun()

    // 运行server
    servers.HttpRun(GinEngine)
}

nsq_producer.go 我们测试用来发送消息的

package main

import "wages_service/servers"

func main()  {
    // 发送消息到nsq
    servers.Producer("hello world!!!")
}

运行测试

  • 启动项目
image.png
  • 推送消息


    image.png
  • 查看结果

image.png

Nsql官网

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 前言 好久不见。 从这篇文章开始,我将带大家走进消息中间件的世界。 消息中间件本质上就是一种很简单的数据结构——队...
    柳树之阅读 3,590评论 3 23
  • 1. 介绍 最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基...
    aoho阅读 9,034评论 1 16
  • # Python 资源大全中文版 我想很多程序员应该记得 GitHub 上有一个 Awesome - XXX 系列...
    小迈克阅读 3,070评论 1 3
  • 1. 分布式系统核心问题 参考书籍:《区块链原理、设计与应用》 一致性问题例子:两个不同的电影院买同一种电影票,如...
    molscar阅读 958评论 0 0
  • 体验入 今天开下班时来了一只边牧看病,这时彦达,高鹏志荣大姐等都主动来帮忙,好快速解决问题 找核心 体谅,互助 转...
    93650345d0d1阅读 215评论 0 0