在ViewController文件里包含头文件,#import <RMQClient/RMQClient.h>
1. 接收方法
amqp://<user>:<pass>@<ip>:5672
amqp://user:pass@host:port
- (void)receive
{
RMQConnection * conn = [[RMQConnection alloc] initWithUri:@"amqps://user:pass@yourHost:yourPort" delegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel>channel = [conn createChannel];
RMQQueue * queue = [channel queue:@"test"];
[queue subscribe:^(RMQMessage * _Nonnull message) {
NSLog(@"message:%@",[[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
}
写完上面的代码后运行报错
Received connection: <RMQConnection: 0x282826180> disconnectedWithError: Error Domain=kCFStreamErrorDomainSSL Code=-9847 "(null)" UserInfo={NSLocalizedRecoverySuggestion=Error code definition can be found in Apple's SecureTransport.h}
在网上一番查找也没找到解决办法,似乎这个和证书有关,后台同事说并没有用证书,看我的Uri是amqps开头说不应该用amqps,那个s需要去掉,去掉s后运行果然不报上面的错误了(但内心还是懵懵的 ps:amqps不是对应https, ampq对应http吗,不理解)
如果手动添加RMQClient,不要忘了添加下图的另外两个:
可能会出现#import <>报错:将#import <>的对应改成#import ""
按照上面写的但是收不到消息,故而加了个RMQExchange
RMQConnection * conn = [[RMQConnection alloc] initWithUri:@"amqp://user:pass@yourHost:yourPort" delegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel>channel = [conn createChannel];
RMQExchange *exchange = [channel direct:@"xxx.server" options:RMQExchangeDeclareDurable];
RMQQueue * queue = [channel queue:@"test"];
[queue bind:exchange];
[queue subscribe:^(RMQMessage * _Nonnull message) {
NSLog(@"message:%@",[[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
然而还是接收不到消息
后来发现是没绑定routingKey(据说RMQExchange一定要有routingKey)加上routingKey后可以接收到消息了,代码如下:
RMQConnection * conn = [[RMQConnection alloc] initWithUri:@"amqp://user:pass@yourHost:yourPort" delegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel>channel = [conn createChannel];
// 这里使用的direct模式
RMQExchange *exchange = [channel direct:@"xxx.server" options:RMQExchangeDeclareDurable];
// 后台说 queue名和routingKey定义成一样的(queue名是为了方便区分定义,本人并不是很明白)
NSString *routingKey = @"test";
RMQQueue * queue = [channel queue:routingKey];
[queue bind:exchange routingKey:routingKey];
[queue subscribe:^(RMQMessage * _Nonnull message) {
NSLog(@"message:%@",[[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
/// 关闭RMQ
- (void)closeRMQ{
if (self.conn) {
NSString *routingKey = @"test";
if (account.length != 0) {
// 队列解绑
[self.queue unbind:self.exchange routingKey:routingKey];
// 队列删除 !!!!这句很重要(不写后台网页上的队列一直存在,导致其他手机无法正常接收消息)
[self.queue delete];
// 关闭通道
[self.channel close];
self.channel = nil;
// 断开连接
[self.conn close];
self.conn = nil;
}
}
}
Queued Messages: 正在排队等待消费的消息
Ready:待消费的消息总数。
Unacked:待应答的消息总数。
Total:总数 Ready+Unacked
MessageRates :
publish:producter pub消息的速率。
deliver:consumer 获取消息的速率。
acknowledge:consumer ack消息的速率。
关于RMQExchange:
交换机:
1.用于接收客户端投递过来的消息
2.分发消息到指定的队列
目前共四种类型:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,目前几乎用不到了。所以直接看另外三种类型)
direct:直连
topic:路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它同样也会识别两个通配符:"#"和" * "。#匹配0个或多个单词," * "匹配不多不少一个单词。(生产端发送了三条消息,消费端只能接收到routingKey为user.save、user.update的这两条消息,因为bindingKey = "user.* ",“ * ”只能匹配到一个字符。如果此时换成“#”,那这三条消息都能接收到。)
fanout:属于广播模式,意思就是消息不走任何的路由规则,只有队列和交换机有绑定关系就能收到消息。(生产者发送了5条消息,消费者1、2 分别都收到了5条消息。)
最后,交换机的属性整理一下
Name: 交换机名称
Type: 交换机类型,direct、topic、 fanout、 headers
Durability: 是否需要持久化
Auto Delete: 当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为False
Arguments: 扩展参数,用于扩展AMQP协议定制化使用
exchange相关解释参考自:https://www.jianshu.com/p/edbd76af74cf
以上就是敲代码的心得啦