我们利用Go语言来实现RabbitMQ的简单模式,其他的工作模式是根据简单模式进行构建的。
RabbitMQ的实例代码如下所示:
const MQURL = "amqp://用户名:密码@host:端口/vhost"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
//队列名称
QueueName string
//交换机
Exchange string
//key
Key string
//连接信息
MqUrl string
sync.Mutex
}
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ{
rabbitMQ := &RabbitMQ{
QueueName: queueName,
Exchange: exchange,
Key: key,
MqUrl:MQURL,
}
var err error
rabbitMQ.conn,err = amqp.Dial(rabbitMQ.MqUrl)
if err != nil {
rabbitMQ.failOnErr(err,"创建连接错误")
}
rabbitMQ.channel,err = rabbitMQ.conn.Channel()
if err != nil {
rabbitMQ.failOnErr(err,"获取Channel失败")
}
return rabbitMQ
}
//断开channel conn
func (r *RabbitMQ) Destroy(){
r.channel.Close()
r.conn.Close()
}
//错误处理函数
func (r *RabbitMQ) failOnErr(err error,msg string) {
if err != nil {
log.Fatal("%s:%s",msg,err)
panic(fmt.Sprintf("%s:%s",msg,err))
}
}
简单模式
我们来实现以下生产端的代码:
//实现simple模式
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName,"","")
}
//实现简单模式下生产者代码
func (r *RabbitMQ) PublishSimple(message string) error{
r.Lock()
defer r.Unlock()
//申请队列,如果不存在会自动创建,存在跳过创建,保证队列存在,消息能发送到队列中
_,err := r.channel.QueueDeclare(
r.QueueName,
//控制消息是否持久化,true开启
false,
//是否为自动删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外属性
nil,
)
if err != nil{
return err
}
//发送消息到队列中
r.channel.Publish(
r.Exchange,
r.QueueName,
//如果为true,根据exchange类型和routekey类型,如果无法找到符合条件的队列,name会把发送的信息返回给发送者
false,
//如果为true,当exchange发送到消息队列后发现队列上没有绑定的消费者,则会将消息返还给发送者
false,
//发送信息
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
return nil
}
对于高并发来说,防止资源产生竞争关系,我在生成端添加了互斥锁,要保证信息已经成功的被添加到队列中;队列创建好,我们就把产生的消息发送出去。相关参数介绍我已经在代码里做了备注。
接下来我们来实现,消费端的代码:
//实现简单模式下消费者代码
func (r *RabbitMQ) ConsumeSimple(orderService service.IOrderService,
productService service.ProductService){
//申请队列,如果不存在会自动创建,存在跳过创建,保证队列存在,消息能发送到队列中
q,err := r.channel.QueueDeclare(
r.QueueName,
//控制消息是否持久化,true开启
false,
//是否为自动删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外属性
nil,
)
if err != nil{
fmt.Println(err)
}
//消费者流控,防止暴库
r.channel.Qos(
//每次只接受一个消息进行消费
1,
//服务器传递的最大容量(以8位字节为单位)
0,
//true对全局可用,false只对当前channel可用
false,
)
//接收消息
messages,err := r.channel.Consume(
q.Name,
//用来区分多个消费者
"",
//是否自动应答
false,
//是否具有排他性
false,
//如果设置为true,表示不能将同一个connection中发送的消息
//传递给同一个connection的消费者
false,
//是否为阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range messages{
//实现我们的处理逻辑函数
log.Printf("Received a message : %s",d.Body)
message := &datamodels.Message{}
err :=json.Unmarshal([]byte(d.Body),message)
if err !=nil {
fmt.Println(err)
}
//插入订单
_,err = orderService.InsertOrderByMessage(message)
if err != nil {
fmt.Println(err)
}
//扣除数量
err = productService.SubNumberOne(message.ProductID)
if err != nil{
fmt.Println(err)
}
//如果为true表示确认所有未确认的消息,false为当前消息
d.Ack(false)
}
}()
log.Printf("[*] Waiting for messages,To exit press CTRAL+C")
<-forever
}
消费端同样首先判断队列是否生成成功,接下来要注意的地方就是,RabbitMQ的限流,等消费者消费完一个信息,才去接收下一个信息,防止消费端在处理类似于Mysql数据库的时候发生暴库,具体实现如下:
//消费者流控,防止暴库
r.channel.Qos(
//每次只接受一个消息进行消费
1,
//服务器传递的最大容量(以8位字节为单位)
0,
//true对全局可用,false只对当前channel可用
false,
)
接下来就开始接收和消费信息,我们通过Go语言的协程来处理接收到的信息,代码如下:
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range messages{
//实现我们的处理逻辑函数
log.Printf("Received a message : %s",d.Body)
message := &datamodels.Message{}
err :=json.Unmarshal([]byte(d.Body),message)
if err !=nil {
fmt.Println(err)
}
//插入订单
_,err = orderService.InsertOrderByMessage(message)
if err != nil {
fmt.Println(err)
}
//扣除数量
err = productService.SubNumberOne(message.ProductID)
if err != nil{
fmt.Println(err)
}
//如果为true表示确认所有未确认的消息,false为当前消息
d.Ack(false)
}
}()
log.Printf("[*] Waiting for messages,To exit press CTRAL+C")
<-forever
注意我们将消费端自动应答模式关闭,是为了确保消费者业务逻辑处理成功再接收下一条,这里最关键的是要手动应答d.Ack(false)
工作模式
工作模式对于简单模式的代码来说是一样的,只是消费端的个数增加了,这种情况适用于生产者生成消息过于快,消费端来不及消费,才会采用多个消费端进行消费。
订阅模式
实例订阅模式的代码如下:
//实现订阅模式
func NewRabbitMQSimple(exchangeName string) *RabbitMQ {
return NewRabbitMQ("",exchangeName ,"")
}
订阅模式下的生产端的代码如下(注意:这块和简单模式的区别在于,要生成交换机,而不是队列):
//实现订阅模式下生产者代码
func (r *RabbitMQ) PublishPub(message string) error{
r.Lock()
defer r.Unlock()
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout", //设置交换机的类型,在订阅模式下交换机的类型为广播类型
true,
false,
false,//true表示这个参数exchange不可以被client用来推送信息,仅用来exchange和exchange之间的绑定
false,
nil,
)
if err != nil {
r.failOnErr(err,"创建交换机异常")
}
//发送消息到队列中
r.channel.Publish(
r.Exchange,
"",
//如果为true,根据exchange类型和routekey类型,如果无法找到符合条件的队列,name会把发送的信息返回给发送者
false,
//如果为true,当exchange发送到消息队列后发现队列上没有绑定的消费者,则会将消息返还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
return nil
}
订阅模式的消费端代码:
//订阅模式下的消费端代码
func (r *RabbitMQ) ConsumePub(){
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout", //设置交换机的类型,在订阅模式下交换机的类型为广播类型
true,
false,
false,//true表示这个参数exchange不可以被client用来推送信息,仅用来exchange和exchange之间的绑定
false,
nil,
)
if err != nil {
r.failOnErr(err,"创建交换机异常")
}
//申请队列,如果不存在会自动创建,存在跳过创建,保证队列存在,消息能发送到队列中
q,err := r.channel.QueueDeclare(
"", //随机生成队列名称
//控制消息是否持久化,true开启
false,
//是否为自动删除
false,
//是否具有排他性
true,
//是否阻塞
false,
//额外属性
nil,
)
if err != nil{
r.failOnErr(err,"生产队列异常")
}
//绑定队列到Exchange中
err = r.channel.QueueBind(
q.Name,
"",//在订阅模式下这个参数要为空,
r.Exchange,
false,
nil,
)
//接收消息
messages,err := r.channel.Consume(
q.Name,
//用来区分多个消费者
"",
//是否自动应答
true,
//是否具有排他性
false,
//如果设置为true,表示不能将同一个connection中发送的消息
//传递给同一个connection的消费者
false,
//是否为阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range messages{
//实现我们的处理逻辑函数
log.Printf("Received a message : %s",d.Body)
}
}()
log.Printf("[*] Waiting for messages,To exit press CTRAL+C")
<-forever
}
路由模式
路由模式的实例代码如下:
//实现simple模式
func NewRabbitMQSimple(exchangeName string, routingKey string) *RabbitMQ {
return NewRabbitMQ("",exchangeName ,routingKey )
}
注意路由模式和订阅模式的区别在于,将广播模式改为direct模式
err := r.channel.ExchangeDeclare(
r.Exchange,
"direct",
true,
false,
false,//true表示这个参数exchange不可以被client用来推送信息,仅用来exchange和exchange之间的绑定
false,
nil,
)
在生产消息的时候区别在于:
r.channel.Publish(
r.Exchange,
r.Key,
//如果为true,根据exchange类型和routekey类型,如果无法找到符合条件的队列,name会把发送的信息返回给发送者
false,
//如果为true,当exchange发送到消息队列后发现队列上没有绑定的消费者,则会将消息返还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
消费端代码实现:
在声明交换机的时候,将广播模式改为direct模式。绑定队列的时候,要将key参数进行添加,代码如下所示:
err = r.channel.QueueBind(
q.Name,
r.Key,
r.Exchange,
false,
nil,
)
话题模式
和路由模式的区别在于,将交换机的模式改变为topic模式,其他的不变。