package rabbitMQ
import (
"github.com/streadway/amqp"
"smartracing.cn/appointment/member/internal/conf"
)
type Deliveryfunc(<-chan amqp.Delivery)
type MQServerinterface {
CreateQueue(string) (amqp.Queue, error)
Publish(string, []byte)error
RegisterConsumer(qNamestring, deliveryDelivery)
StartConsumer()
}
type RabbitMQstruct {
conn *amqp.Connection
callbackmap[string]Delivery
connNotifychan *amqp.Error
}
func (rabbitmq *RabbitMQ)RegisterConsumer(qNamestring, deliveryDelivery) {
rabbitmq.callback[qName] = delivery
}
func New(uristring) (MQServer, error) {
conn, err :=amqp.Dial(uri)
if err !=nil {
return nil, err
}
rabbit := &RabbitMQ{conn: conn, callback:make(map[string]Delivery)}
return rabbit, nil
}
func (rabbitmq *RabbitMQ)CreateQueue(qNamestring) (amqp.Queue, error) {
ch, err :=rabbitmq.conn.Channel()
if err !=nil {
return amqp.Queue{}, err
}
defer ch.Close()
queue, err := ch.QueueDeclare(qName, true, false, false, false, nil)
if err !=nil {
return amqp.Queue{}, err
}
return queue, nil
}
func (rabbitmq *RabbitMQ)Publish(qNamestring, body []byte)error {
queue, err :=rabbitmq.CreateQueue(qName)
ch, err :=rabbitmq.conn.Channel()
if err !=nil {
return err
}
defer ch.Close()
err = ch.Publish(
"",
queue.Name,
false,
false,
amqp.Publishing{
DeliveryMode:amqp.Persistent,
ContentType:"application/json",
Body: body,
})
if err !=nil {
return err
}
conf.Log.Infof("rabbitMQ publish queue:%s, message:%s", qName, string(body))
return nil
}
func SendRabbitMQ(queuestring, body []byte)error {
r, err :=New(conf.AppConfig.RabbitMqUrl)
if err !=nil {
conf.Log.Errorf("New RabbitMQ Fatal:%v", err)
return err
}
if err := r.Publish(queue, body); err !=nil {
return err
}
return nil
}
func (rabbitmq *RabbitMQ)StartConsumer() {
for queue, delivery :=range rabbitmq.callback {
go func(qNamestring, deliveryDelivery) {
queue, err :=rabbitmq.CreateQueue(qName)
if err !=nil {
conf.Log.Fatalf("rabbitmq create queue fatal:%v", err)
panic(err.Error())
}
ch, err :=rabbitmq.conn.Channel()
if err !=nil {
conf.Log.Fatalf("rabbitmq channel fatal:%v", err)
panic(err.Error())
}
defer ch.Close()
err = ch.Qos(
3,
0,
false,
)
if err !=nil {
conf.Log.Fatalf("rabbitmq qos fatal:%v", err)
panic(err.Error())
}
msgs, err := ch.Consume(queue.Name, "", false, false, false, false, nil)
if err !=nil {
conf.Log.Fatalf("rabbitmq consume fatal:%v", err)
panic(err.Error())
}
conf.Log.Infof("rabbitmq add consume success, consume:%s, delivery:%v", queue, delivery)
go delivery(msgs)
select {}
}(queue, delivery)
}
}