go+redis实现消息队列发布与订阅

1.生产者随机发布消息,用rpush发布。
2.消费者用lpop订阅消费,一旦没有消息,随机休眠。
redis坐消息队列的缺点:没有持久化。一旦消息没有人消费,积累到一定程度后机会丢失

package main

import (
    "fmt"
    "time"
    "os"
    "strconv"
    "math/rand"
    "github.com/gomodule/redigo/redis"
)

const RMQ string = "mqtest"

func producer() {
    redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("hdiot"))
    if err != nil {
        fmt.Println(err)
        return
    }
    
    defer redis_conn.Close()
    
    rand.Seed(time.Now().UnixNano())

    var i = 1

    for {
        _,err = redis_conn.Do("rpush", RMQ, strconv.Itoa(i))
        if(err!=nil) {
            fmt.Println("produce error")
            continue
        }
        fmt.Println("produce element:%d", i)
        time.Sleep(time.Duration(rand.Intn(10))*time.Second)
        i++
    }
}

func consumer() {
    redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("hdiot"))
    if err != nil {
        fmt.Println(err)
        return
    }
    
    defer redis_conn.Close()

    rand.Seed(time.Now().UnixNano())

    for {
        ele,err := redis.String(redis_conn.Do("lpop", RMQ))
        if(err != nil) {
            fmt.Println("no msg.sleep now")
            time.Sleep(time.Duration(rand.Intn(10))*time.Second)
        } else {
            fmt.Println("cosume element:%s", ele)
        }
    }
}

func main() {
    list := os.Args
    if(list[1] == "pro") {
        go producer()
    } else if (list[1] == "con") {
        go consumer()
    }
    for {
        time.Sleep(time.Duration(10000)*time.Second)
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容