org.eclipse.paho.client.mqttv3 源码解析(二) 接收

前面介绍了发送,我们回顾下
总体接收简单点,就是控制Qos的响应复杂点

总体图

在connect的时候开启了Task:

    public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
        synchronized (conLock) {

                MqttConnect connect = new MqttConnect(client.getClientId(),
                        conOptions.getMqttVersion(),
                        conOptions.isCleanSession(),
                        conOptions.getKeepAliveInterval(),
                        conOptions.getUserName(),
                        conOptions.getPassword(),
                        conOptions.getWillMessage(),
                        conOptions.getWillDestination());

                this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
                this.clientState.setCleanSession(conOptions.isCleanSession());
                this.clientState.setMaxInflight(conOptions.getMaxInflight());

                tokenStore.open();
                ConnectBG conbg = new ConnectBG(this, token, connect);
                conbg.start();
            }

          .......
    }

private class ConnectBG implements Runnable {

       ........

        public void run() {

            try {
                NetworkModule networkModule = networkModules[networkModuleIndex];
                networkModule.start();
                receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
                receiver.start("MQTT Rec: "+getClient().getClientId());
                sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
                sender.start("MQTT Snd: "+getClient().getClientId());
                callback.start("MQTT Call: "+getClient().getClientId());                
                internalSend(conPacket, conToken);
            } 
              ........
        }
    }

然后启动CommsSender:

public class CommsReceiverimplements Runnable {

    private boolean running = false;
    private Object lifecycle = new Object();
    private ClientState clientState = null;
    private ClientComms clientComms = null;
    private MqttInputStream in;
    private CommsTokenStore tokenStore = null;
    private Thread recThread = null;
    private volatile boolean receiving;
    
    ........
    
    /**
     * Run loop to receive messages from the server.
     */
    public void run() {
        final String methodName = "run";
        MqttToken token = null;
        
        while (running && (in != null)) {
            try {
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage(); // 【: 1】
                receiving = false;
                
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message); // 【: 2】
                    if (token!=null) {
                        synchronized (token) {
                            // Ensure the notify processing is done under a lock on the token
                            // This ensures that the send processing can complete  before the 
                            // receive processing starts! ( request and ack and ack processing
                            // can occur before request processing is complete if not!
                            clientState.notifyReceivedAck((MqttAck)message); // 【: 3】
                        }
                    } else {
                        // It its an ack and there is no token then something is not right.
                        // An ack should always have a token assoicated with it.
                        throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                    }
                } else {
                    // A new message has arrived
                    clientState.notifyReceivedMsg(message); // 【: 4】
                }
            }
            catch (MqttException ex) {
                running = false;
                // Token maybe null but that is handled in shutdown
                clientComms.shutdownConnection(token, ex);
            } 
            catch (IOException ioe) {
                running = false;
                // An EOFException could be raised if the broker processes the 
                // DISCONNECT and ends the socket before we complete. As such,
                // only shutdown the connection if we're not already shutting down.
                if (!clientComms.isDisconnecting()) {
                    clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
                }
            }
            finally {
                receiving = false;
            }
        }
    }
}

异常处理我就先不介绍了,细节要自己看下。
1.MqttInputStream来自前面介绍TcpNetworkModule,这个类就是socket底层代码。
2.tokenStore.getToken(message),因为是确认ack的,他重连和ping的关键,特殊message,在自己发出去接受到服务器应答的时候已经被持久化了。然后同步下完成ACK确认就3。
3.看下ACK确认实现

/**
     * Called by the CommsReceiver when an ack has arrived. 
     * 
     * @param message
     * @throws MqttException
     */
    protected void notifyReceivedAck(MqttAck ack) throws MqttException {
        this.lastInboundActivity = System.currentTimeMillis();
        MqttToken token = tokenStore.getToken(ack);
        MqttException mex = null;

        if (token == null) {
            // @TRACE 662=no message found for ack id={0}
            log.fine(CLASS_NAME, methodName, "662", new Object[] {
                    new Integer(ack.getMessageId())});
        } else if (ack instanceof MqttPubRec) {
            // Complete the QoS 2 flow. Unlike all other
            // flows, QoS is a 2 phase flow. The second phase sends a
            // PUBREL - the operation is not complete until a PUBCOMP
            // is received
            MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
            this.send(rel, token);
        } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
            // QoS 1 & 2 notify users of result before removing from
            // persistence
            notifyResult(ack, token, mex);
            // Do not remove publish / delivery token at this stage
            // do this when the persistence is removed later 
        } else if (ack instanceof MqttPingResp) {
            synchronized (pingOutstandingLock) {
                pingOutstanding = Math.max(0,  pingOutstanding-1);
                notifyResult(ack, token, mex);
                if (pingOutstanding == 0) {
                    tokenStore.removeToken(ack);
                }
            }
        } else if (ack instanceof MqttConnack) {
            int rc = ((MqttConnack) ack).getReturnCode();
            if (rc == 0) {
                synchronized (queueLock) {
                    if (cleanSession) {
                        clearState();
                        // Add the connect token back in so that users can be  
                        // notified when connect completes.
                        tokenStore.saveToken(token,ack);
                    }
                    inFlightPubRels = 0;
                    actualInFlight = 0;
                    restoreInflightMessages();
                    connected();
                }
            } else {
                mex = ExceptionHelper.createMqttException(rc);
                throw mex;
            }

            clientComms.connectComplete((MqttConnack) ack, mex);
            notifyResult(ack, token, mex);
            tokenStore.removeToken(ack);

            // Notify the sender thread that there maybe work for it to do now
            synchronized (queueLock) {
                queueLock.notifyAll();
            }
        } else {
            // Sub ack or unsuback
            notifyResult(ack, token, mex);
            releaseMessageId(ack.getMessageId());
            tokenStore.removeToken(ack);
        }
        
        checkQuiesceLock(); // 这货就是确认队列是否空,否则就释放锁
    }

根据不同的消息实现逻辑转换,MqttPubComp和MqttPubAck,处理 QoS 1 & 2持久化重连的遗留问题,MqttPubRec表示发布失败需要重发,MqttConnack自己处理链接重开,心跳包很重要
4.这个就是你自己实现和关注的消息类容。

/**
     * Called by the CommsReceiver when a message has been received.
     * Handles inbound messages and other flows such as PUBREL. 
     * 
     * @param message
     * @throws MqttException
     */
    protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
        final String methodName = "notifyReceivedMsg";
        this.lastInboundActivity = System.currentTimeMillis();
    
        if (!quiescing) {
            if (message instanceof MqttPublish) {
                MqttPublish send = (MqttPublish) message;
                switch (send.getMessage().getQos()) {
                case 0:
                case 1:
                    if (callback != null) {
                        callback.messageArrived(send); // 【: 5】
                    }
                    break;
                case 2:
                    persistence.put(getReceivedPersistenceKey(message),
                            (MqttPublish) message);
                    inboundQoS2.put(new Integer(send.getMessageId()), send);
                    this.send(new MqttPubRec(send), null); // 【: 6】
                    break;

                default:
                    //should NOT reach here
                }
            } else if (message instanceof MqttPubRel) {
                MqttPublish sendMsg = (MqttPublish) inboundQoS2
                        .get(new Integer(message.getMessageId()));
                if (sendMsg != null) {
                    if (callback != null) {
                        callback.messageArrived(sendMsg); // 【: 7】
                    }
                } else {
                    // Original publish has already been delivered.
                    MqttPubComp pubComp = new MqttPubComp(message
                            .getMessageId());
                    this.send(pubComp, null); // 【: 8】
                }
            }
        }
    }

5.Qos1或者0 就直接通知外部callback消息盒子,然后单独线程回调界面消息了

/**
     * This method is called when a message arrives on a topic. Messages are
     * only added to the queue for inbound messages if the client is not
     * quiescing.
     * 
     * @param sendMessage
     *            the MQTT SEND message.
     */
    public void messageArrived(MqttPublish sendMessage) {
        final String methodName = "messageArrived";
        if (mqttCallback != null || callbacks.size() > 0) {
            // If we already have enough messages queued up in memory, wait
            // until some more queue space becomes available. This helps 
            // the client protect itself from getting flooded by messages 
            // from the server.
            synchronized (spaceAvailable) {
                while (running && !quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
                    try {
                        spaceAvailable.wait(200);
                    } catch (InterruptedException ex) {
                    }
                }
            }
            if (!quiescing) {
                messageQueue.addElement(sendMessage);
                // Notify the CommsCallback thread that there's work to do...
                synchronized (workAvailable) {
                    workAvailable.notifyAll();
                }
            }
        }
    }

全部加入消息盒子队列,如果满了或者正在处理,会wait一下
6.Qos为2 是精确只发一次需要通知服务器和客户端全部都收到了,不然重传
7.就是Qos为2的情况的到达确认。
8.合并消息,然后重发。

然后就是消息队列轮询,没有就阻塞,有就通知界面了。相对于发送,接收的类简单点。大家可以回顾下前面的发送过程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,591评论 18 139
  • 点击查看原文 Web SDK 开发手册 SDK 概述 网易云信 SDK 为 Web 应用提供一个完善的 IM 系统...
    layjoy阅读 13,653评论 0 15
  • 1.OkHttp源码解析(一):OKHttp初阶2 OkHttp源码解析(二):OkHttp连接的"前戏"——HT...
    隔壁老李头阅读 20,802评论 24 176
  • 小艺是我朋友圈里比较美满幸福的一个。娇小俊秀的面容,修长性感的身材,优雅温存的性格,有疼爱自己的老公,一个高高帅帅...
    槑可儿阅读 1,824评论 0 4
  • 有时候,有些事情,不太适合让别人知道 越长大越明白,有些话,只适合说给自己听 有些情绪,只适合自己调 不是你的,你...
    C小宁willbe女王阅读 141评论 0 0