go队列方式集成rabbitMq

package rabbitMQ

import (

"github.com/streadway/amqp"

"smartracing.cn/appointment/member/internal/conf"

)

type Deliveryfunc(<-chan amqp.Delivery)

type MQServerinterface {

CreateQueue(string) (amqp.Queue, error)

Publish(string, []byte)error

  RegisterConsumer(qNamestring, deliveryDelivery)

StartConsumer()

}

type RabbitMQstruct {

conn      *amqp.Connection

  callbackmap[string]Delivery

  connNotifychan *amqp.Error

}

func (rabbitmq *RabbitMQ)RegisterConsumer(qNamestring, deliveryDelivery) {

rabbitmq.callback[qName] = delivery

}

func New(uristring) (MQServer, error) {

conn, err :=amqp.Dial(uri)

if err !=nil {

return nil, err

}

rabbit := &RabbitMQ{conn: conn, callback:make(map[string]Delivery)}

return rabbit, nil

}

func (rabbitmq *RabbitMQ)CreateQueue(qNamestring) (amqp.Queue, error) {

ch, err :=rabbitmq.conn.Channel()

if err !=nil {

return amqp.Queue{}, err

}

defer ch.Close()

queue, err := ch.QueueDeclare(qName, true, false, false, false, nil)

if err !=nil {

return amqp.Queue{}, err

}

return queue, nil

}

func (rabbitmq *RabbitMQ)Publish(qNamestring, body []byte)error {

queue, err :=rabbitmq.CreateQueue(qName)

ch, err :=rabbitmq.conn.Channel()

if err !=nil {

return err

}

defer ch.Close()

err = ch.Publish(

"",

      queue.Name,

      false,

      false,

      amqp.Publishing{

DeliveryMode:amqp.Persistent,

        ContentType:"application/json",

        Body:        body,

      })

if err !=nil {

return err

}

conf.Log.Infof("rabbitMQ publish queue:%s, message:%s", qName, string(body))

return nil

}

func SendRabbitMQ(queuestring, body []byte)error {

r, err :=New(conf.AppConfig.RabbitMqUrl)

if err !=nil {

conf.Log.Errorf("New RabbitMQ Fatal:%v", err)

return err

}

if err := r.Publish(queue, body); err !=nil {

return err

}

return nil

}

func (rabbitmq *RabbitMQ)StartConsumer() {

for queue, delivery :=range rabbitmq.callback {

go func(qNamestring, deliveryDelivery) {

queue, err :=rabbitmq.CreateQueue(qName)

if err !=nil {

conf.Log.Fatalf("rabbitmq create queue fatal:%v", err)

panic(err.Error())

}

ch, err :=rabbitmq.conn.Channel()

if err !=nil {

conf.Log.Fatalf("rabbitmq channel fatal:%v", err)

panic(err.Error())

}

defer ch.Close()

err = ch.Qos(

3,

            0,

            false,

        )

if err !=nil {

conf.Log.Fatalf("rabbitmq qos fatal:%v", err)

panic(err.Error())

}

msgs, err := ch.Consume(queue.Name, "", false, false, false, false, nil)

if err !=nil {

conf.Log.Fatalf("rabbitmq consume fatal:%v", err)

panic(err.Error())

}

conf.Log.Infof("rabbitmq add consume success, consume:%s, delivery:%v", queue, delivery)

go delivery(msgs)

select {}

}(queue, delivery)

}

}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容