ZeroMQ通讯订阅模式--iOS

ZeroMQ在使用模式上支持多种,有req-reply,publish-subscribe。订阅模式是zmq的重头戏,以鄙人的使用和理解来浅谈下订阅模式,本人也是边学边用,如有问题请大神指出。关于应答模式请阅读 这篇文章

zmq的订阅模式实现示例代码如下

ZMQContext *ctx = [[ZMQContext alloc] initWithIOThreads:1U];
// Socket to talk to server
ZMQSocket *subscriber = [ctx socketWithType:ZMQ_SUB];
if (![subscriber connectToEndpoint:@"tcp://localhost:5556"]) {
    return EXIT_FAILURE;
}

const char *kNYCZipCode = "10001";
const char *filter = (argc > 1)? argv[1] : kNYCZipCode;
NSData *filterData = [NSData dataWithBytes:filter length:strlen(filter)];
[subscriber setData:filterData forOption:ZMQ_SUBSCRIBE];

(void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);

const int kMaxUpdate = 100;
long total_temp = 0;
for (int update_nbr = 0; update_nbr < kMaxUpdate; ++update_nbr) {
    NSData *msg = [subscriber receiveDataWithFlags:0];
    const char *string = [msg bytes];

    int zipcode = 0, temperature = 0, relhumidity = 0;
    (void)sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);

    printf("%d ", temperature);
    total_temp += temperature;
}

订阅模式的重点在于数据分发,数据由服务器主动推回来给客户端,订阅的数据统一接收,然后在分发到需要数据处理的对象中。示例代码只是简单的接收数据,它的做法是一个线程接收一个订阅消息,而且数据接收是阻塞当前线程的,这个在实际的项目应用也是不符合,所以封装和优化数据分发是必须的。

NSData *msg = [subscriber receiveDataWithFlags:0]; // 阻塞线程直到接收到数据

数据的处理分发采用代理来回调,当然也可以使用block,个人建议使用代理,毕竟数据分发到不同的处理对象中,使用代理可以很好的避免循环引用,避免内存泄漏。我这里是使用一个两层Dictionary来存储代理,dictionary的遍历速度要比数组好一点。第一层dictionary以订阅码来做key,第二层dictionary以代理对象的hash值为key,这样可以快速找到代理对象进行分发,也可以准确找到某个代理对象并实现移除操作。

添加代理对象的方法

- (void)setCode:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
    NSAssert(delegate != nil, @"代理为空");
    NSAssert(code != nil, @"订阅码为空");

    // 取出当前订阅码的代理字典
    NSMutableDictionary *delegateDict = self.delegates[code];

    if (delegateDict == nil) { // 不存在当前的订阅码代理对象
        // 创建当前订阅码的代理对象的字典
        delegateDict = [NSMutableDictionary dictionary];
        self.delegates[code] = delegateDict;
    }

    // 添加代理对象
    NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
    delegateDict[key] = delegate;
}

移除代理对象的方法

- (void)removeDelegate:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
    NSAssert(delegate != nil, @"代理为空");
    NSAssert(code != nil, @"订阅码为空");

    // 取出当前订阅码的代理字典
    NSMutableDictionary *delegateDict = self.delegates[code];

    if (delegateDict == nil) return; // 不存在当前的订阅码代理对象

    NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
    [delegateDict removeObjectForKey:key];
}

注意,当对象不需要处理订阅消息时,一定要调用removeDelegate方法,因为代理对象是存储到字典中的,持有代理对象的强引用。

zmq要订阅什么消息就发对应的订阅码,如果要全部订阅,就发一个空字符串,在子线程使用一个while循环来接收数据,定义一个标志位,让while循环可以控制,更改后的代码如下

_context = [[ZMQContext alloc] initWithIOThreads:1];
_socket = [_context socketWithType:ZMQ_SUB];
_socket.loadingtime = 3000; // 超时时间,单位毫秒

NSString *endpoint = @"tcp://:41204"; // 服务器IP地址
if (![_socket connectToEndpoint:endpoint]) {
    NSLog(@"订阅失败");
    return;
}

NSData *filterData = [@"" dataUsingEncoding:NSUTF8StringEncoding];
[_socket setData:filterData forOption:ZMQ_SUBSCRIBE];

(void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);

closeSocket = NO;
while (!closeSocket) {
    
    @autoreleasepool {
        NSData *recieveData = [_socket receiveDataWithFlags:0];
        if (recieveData == nil)  continue; // 订阅消息超时返回nil
        
        // 数据处理
        NSString *dataStr = [[NSString alloc] initWithData:recieveData encoding:NSUTF8StringEncoding];
        NSRange range = [dataStr rangeOfString:@"{" options:NSCaseInsensitiveSearch];
        NSString *subStr = [dataStr substringFromIndex:range.location];
        NSString *codeKey = [dataStr substringToIndex:range.location];
        
        /*** 数据分发 ***/
        // 取出当前订阅码的代理字典
        NSMutableDictionary *delegateDict = self.delegates[codeKey];
        
        if (delegateDict == nil) continue; // 没有对当前订阅码的代理对象
        
        [delegateDict enumerateKeysAndObjectsUsingBlock:^(id  _Nonnull key, id  _Nonnull obj, BOOL * _Nonnull stop) {
            
            [self handleData:subStr delegate:obj code:codeKey];
        }];
    }    
}

zmq订阅返回的数据格式是订阅码拼接上推送的数据,当接收的数据为nil,那就是接收超时了,zmq订阅只要接收到数据,肯定不为空,必然有订阅码。这里的数据处理方式是因为我项目服务器返回的是订阅码拼接json的数据,要根据服务器定义的数据格式来处理数据。注意,这里是有一个坑,要在循环内使用@ autoreleasepool {}。一个对象只要出了作用的使用区域,就会自动释放了,可是当使用区域是一个while循环时,系统会认为你的对象还要使用就不会释放对象。所以,在循环里加上autoreleasepool标记,不管数据传递到哪个对象中使用,只有没有对象牵引着数据对象,就会释放了。

我在使用zmq订阅模式的时候发现两个问题。zmq订阅模式有自动重新连接的机制,比如断网重连,服务器断开重连,但是超过一分钟好像就重连不了,我测试了很多次,可是安卓组的同事使用zmq订阅不会遇到重连不了的问题。因为重连不了,我要关闭订阅socket,重新开启订阅socket,发现一关闭socket就马上开启socket,zmq底层库就报错,然后程序直接崩溃了,所以我的启动订阅的方法要判断是否socket的关闭。

- (void)start {
    if (closeSocket) {
        [self performSelector:@selector(startConnect) withObject:nil afterDelay:1.0];
        return;
    }

    [self startConnect];
}

zmq的订阅模式使用起来还是很稳定的,相对来说zmq的应答模式会出现请求丢弃的问题。因为本人的项目使用zmq通讯库,后期还会继续研究。

示例代码都放在 github

参考

zeroMQ使用指导 http://zguide.zeromq.org/page:all

zeroMQ的示例程序 https://github.com/imatix/zguide.git

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容

  • 拯救世界 略 开始的假设 我们假设你使用ZeroMQ 3.2以上的版本。我们假设你使用Linux或者类似的操作系统...
    lakerszhy阅读 11,285评论 1 14
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,077评论 25 707
  • ZeroMQ API Reference 创建一个ZMQ的上下文环境,是ZMQ一切的开始。 线程安全,不需要自己加...
    分享放大价值阅读 3,155评论 0 3
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • 从7月25日到今天10月12日,我在乐刻的回忆就只有这两个半月了。 时至今日,到此时此刻我都很不清楚到底是怎么走到...
    学生阿豪阅读 383评论 1 1