MINA开发-1-Android客户端开发

MINA客户端的应用应用体系结构

在上一篇文章中我们介绍了MINA服务器端的开发流程,这一章我给大家介绍MINA的客户端开发流程,客户端开发我们基于Android来实现相应的开发过程,首先让我们来看一下MINA客户端开发的应用体系结构图:


clientdiagram.png
  • 客户端首先创建一个IOConnector(MINA Construct (结构) 中连接Socket服务端的接口 ),初始化绑定Server
  • 创建的一个会话,并且与Connection相关联
  • 应用程序/客户端写入会话,遍历过过滤器链之后将数据发送到服务器
  • 从服务器接收到的所有响应/消息都将遍历过滤器链,回调给IoHandler进行处理

Android端TCP代码示例

首先创建android项目,和普通的Android项目一致,引入对应的MINA客户端jar包:


android-project.png

按照客户端的体系结构,按顺序创建对应的对象:

/**
     * 创建tcp客户端的连接
     */
    public void createTcpConnect() throws InterruptedException {
        if (ioSession == null) {
            //首先创建对应的tcp连接
            IoConnector ioConnector = new NioSocketConnector();
            //设置超时时间
            ioConnector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
            //添加过滤器
            ioConnector.getFilterChain().addLast("logger", new LoggingFilter());//添加日志过滤器
            ioConnector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));//设置字节处理过滤器
            //添加IoHandler
            ioConnector.setHandler(new ClientSessionHandler());

//            IoSession ioSession;

            //通过ConnectFuture来连接服务器
            for (; ; ) {
                try {
                    ConnectFuture future = ioConnector.connect(new InetSocketAddress(HOSTNAME, PORT));
                    future.awaitUninterruptibly();
                    ioSession = future.getSession();
                    break;//连接成功跳出死循环
                } catch (Exception e) {
                    System.err.println("Failed to connect");
                    e.printStackTrace();
                    Thread.sleep(5000);//如果连接失败,5秒后继续连接直到连接成功
                }
            }
        }
        ioSession.write("tcp-ceshi");
    }

和服务端不同的是,客户端创建了IoConnector用来连接服务器,其他设置属性,比如过滤器,Handler,等等设置都很相似。
客户端创建对应的IoSession来实现数据的发送,通过IoHandler来获取对应的服务端的数据。这里我们主要分析一下ConnectFuture来实现对应的连接,首先看看通过awaitUninterruptibly()方法实现连接成功:

//通过ConnectFuture来连接服务器
            for (; ; ) {
                try {
                    ConnectFuture future = ioConnector.connect(new InetSocketAddress(HOSTNAME, PORT));
                    future.awaitUninterruptibly();
                    ioSession = future.getSession();
                    break;//连接成功跳出死循环
                } catch (Exception e) {
                    System.err.println("Failed to connect");
                    e.printStackTrace();
                    Thread.sleep(5000);//如果连接失败,5秒后继续连接直到连接成功
                }
            }

ConnectFuture是一个接口继承自IoFuture,而在使用过程中实际上使用的是默认实现类,我们通过源码来分析连接过程(AbstractPollingIoConnector.java):

/**
     * {@inheritDoc}
     */
    @Override
    @SuppressWarnings("unchecked")
    protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
            IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
        H handle = null;
        boolean success = false;
        try {
            handle = newHandle(localAddress);
            if (connect(handle, remoteAddress)) {
                ConnectFuture future = new DefaultConnectFuture();//(1)
                T session = newSession(processor, handle);
                initSession(session, future, sessionInitializer);
                // Forward the remaining process to the IoProcessor.
                session.getProcessor().add(session);
                success = true;
                return future;
            }

            success = true;
        } catch (Exception e) {
            return DefaultConnectFuture.newFailedFuture(e);
        } finally {
            if (!success && handle != null) {
                try {
                    close(handle);
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                }
            }
        }

        ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
        connectQueue.add(request);
        startupWorker();
        wakeup();

        return request;
    }

我们可以看到,ConnectFuture的实现过程是,首先判断是否连接成功:

 if (connect(handle, remoteAddress)) {
                ConnectFuture future = new DefaultConnectFuture();//(1)
                T session = newSession(processor, handle);
                initSession(session, future, sessionInitializer);
                // Forward the remaining process to the IoProcessor.
                session.getProcessor().add(session);
                success = true;
                return future;
            }

如果连接成功,创建默认的ConnectFuture对象,初始化创建有效连接的Session会话直接返回即可。但是如果尚没有连接成功,会创建ConnectionRequest 对象返回,我们该对象对应的源码 (AbstractPollingIoConnector.java内部类):

public final class ConnectionRequest extends DefaultConnectFuture {
        /** The handle associated with this connection request */
        private final H handle;

        /** The time up to this connection request will be valid */
        private final long deadline;

        /** The callback to call when the session is initialized */
        private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;

        public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
            this.handle = handle;
            long timeout = getConnectTimeoutMillis();

            if (timeout <= 0L) {
                this.deadline = Long.MAX_VALUE;
            } else {
                this.deadline = System.currentTimeMillis() + timeout;
            }

            this.sessionInitializer = callback;
        }

        public H getHandle() {
            return handle;
        }

        public long getDeadline() {
            return deadline;
        }

        public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
            return sessionInitializer;
        }

        @Override
        public boolean cancel() {
            if (!isDone()) {
                boolean justCancelled = super.cancel();

                // We haven't cancelled the request before, so add the future
                // in the cancel queue.
                if (justCancelled) {
                    cancelQueue.add(this);
                    startupWorker();
                    wakeup();
                }
            }

            return true;
        }
    }

我们可以看到,ConnectionRequest继承自DefaultConnectFuture ;创建完ConnectionRequest这个对象之后,就会将对应的对象实例添加到队列中去:

ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
connectQueue.add(request);
 startupWorker();
 wakeup();
 return request;

通过方法startupWorker()实现请求队列的轮询操作,具体是创建对应的Connector(实现Runnable),然后通过调用线程池去完成对应的操作,我们来看看Connector的实现过程 (AbstractPollingIoConnector.java内部类):

private class Connector implements Runnable {

        public void run() {
            assert (connectorRef.get() == this);

            int nHandles = 0;

            while (selectable) {
                try {
                    // the timeout for select shall be smaller of the connect
                    // timeout or 1 second...
                    int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
                    int selected = select(timeout);

                    nHandles += registerNew();

                    // get a chance to get out of the connector loop, if we
                    // don't have any more handles
                    if (nHandles == 0) {
                        connectorRef.set(null);

                        if (connectQueue.isEmpty()) {
                            assert (connectorRef.get() != this);
                            break;
                        }

                        if (!connectorRef.compareAndSet(null, this)) {
                            assert (connectorRef.get() != this);
                            break;
                        }

                        assert (connectorRef.get() == this);
                    }

                    if (selected > 0) {
                        nHandles -= processConnections(selectedHandles());
                    }

                    processTimedOutSessions(allHandles());

                    nHandles -= cancelKeys();
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    ExceptionMonitor.getInstance().exceptionCaught(cse);
                    break;
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            if (selectable && isDisposing()) {
                selectable = false;
                try {
                    if (createdProcessor) {
                        processor.dispose();
                    }
                } finally {
                    try {
                        synchronized (disposalLock) {
                            if (isDisposing()) {
                                destroy();
                            }
                        }
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    } finally {
                        disposalFuture.setDone();
                    }
                }
            }
        }
    }

由源码我们可以看到具体的操作方法processConnections中,实现了主要操作:

/**
     * Process the incoming connections, creating a new session for each valid
     * connection.
     */
    private int processConnections(Iterator<H> handlers) {
        int nHandles = 0;

        // Loop on each connection request
        while (handlers.hasNext()) {
            H handle = handlers.next();
            handlers.remove();

            ConnectionRequest connectionRequest = getConnectionRequest(handle);

            if (connectionRequest == null) {
                continue;
            }

            boolean success = false;
            try {
                if (finishConnect(handle)) {
                    T session = newSession(processor, handle);
                    initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
                    // Forward the remaining process to the IoProcessor.
                    session.getProcessor().add(session);
                    nHandles++;
                }
                success = true;
            } catch (Exception e) {
                connectionRequest.setException(e);
            } finally {
                if (!success) {
                    // The connection failed, we have to cancel it.
                    cancelQueue.offer(connectionRequest);
                }
            }
        }
        return nHandles;
    }

该方法判断连接是否完成,如果完成,创建初始化的Session对象;执行方法 initSession(session, connectionRequest, connectionRequest.getSessionInitializer())之后,就会将ConnectFuture中的状态码ready设置为true(这一块的代码暂时没有发现,但是通过打断点得到执行完该方法,ready=true),修改状态时,执行方法(ConnectFuture中的方法)setValue(...):

/**
     * Sets the result of the asynchronous operation, and mark it as finished.
     * 
     * @param newValue The result to store into the Future
     * @return {@code true} if the value has been set, {@code false} if
     * the future already has a value (thus is in ready state)
     */
    public boolean setValue(Object newValue) {
        synchronized (lock) {
            // Allowed only once.
            if (ready) {
                return false;
            }

            result = newValue;
            ready = true;
            
            // Now, if we have waiters, notify them that the operation has completed
            if (waiters > 0) {
                lock.notifyAll();
            }
        }

        // Last, not least, inform the listeners
        notifyListeners();
        
        return true;
    }

执行该方法,就会调用notifyListeners方法来实现回调函数,由于awaitUninterruptibly()方法一直在阻塞等待ready状态变化来结束执行,接下来分别看看两种方式的执行:
1.awaitUninterruptibly():

/**
     * {@inheritDoc}
     */
    @Override
    public ConnectFuture awaitUninterruptibly() {
        return (ConnectFuture) super.awaitUninterruptibly();
    }

awaitUninterruptibly方法最终实现方法:

/**
     * Wait for the Future to be ready. If the requested delay is 0 or
     * negative, this method immediately returns the value of the
     * 'ready' flag.
     * Every 5 second, the wait will be suspended to be able to check if
     * there is a deadlock or not.
     * 
     * @param timeoutMillis The delay we will wait for the Future to be ready
     * @param interruptable Tells if the wait can be interrupted or not
     * @return <tt>true</tt> if the Future is ready
     * @throws InterruptedException If the thread has been interrupted
     * when it's not allowed.
     */
    private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
        long endTime = System.currentTimeMillis() + timeoutMillis;

        if (endTime < 0) {
            endTime = Long.MAX_VALUE;
        }

        synchronized (lock) {
            // We can quit if the ready flag is set to true, or if
            // the timeout is set to 0 or below : we don't wait in this case.
            if (ready||(timeoutMillis <= 0)) {
                return ready;
            }

            // The operation is not completed : we have to wait
            waiters++;

            try {
                for (;;) {
                    try {
                        long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
                        
                        // Wait for the requested period of time,
                        // but every DEAD_LOCK_CHECK_INTERVAL seconds, we will
                        // check that we aren't blocked.
                        lock.wait(timeOut);
                    } catch (InterruptedException e) {
                        if (interruptable) {
                            throw e;
                        }
                    }

                    if (ready || (endTime < System.currentTimeMillis())) {
                        return ready;
                    } else {
                        // Take a chance, detect a potential deadlock
                        checkDeadLock();
                    }
                }
            } finally {
                // We get here for 3 possible reasons :
                // 1) We have been notified (the operation has completed a way or another)
                // 2) We have reached the timeout
                // 3) The thread has been interrupted
                // In any case, we decrement the number of waiters, and we get out.
                waiters--;
                
                if (!ready) {
                    checkDeadLock();
                }
            }
        }
    }

因此我们可以知道,通过阻塞的方式实现最终的连接成功,接着获取对应的IoSession;
2.通过回调来实现方法:notifyListeners():

/**
     * Notify the listeners, if we have some.
     */
    private void notifyListeners() {
        // There won't be any visibility problem or concurrent modification
        // because 'ready' flag will be checked against both addListener and
        // removeListener calls.
        if (firstListener != null) {
            notifyListener(firstListener);
            firstListener = null;

            if (otherListeners != null) {
                for (IoFutureListener<?> listener : otherListeners) {
                    notifyListener(listener);
                }
                
                otherListeners = null;
            }
        }
    }

    @SuppressWarnings("unchecked")
    private void notifyListener(IoFutureListener listener) {
        try {
            listener.operationComplete(this);
        } catch (Exception e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);
        }
    }

ConnectFuture回调的方法来实现获取Session的代码:

connFuture.addListener( new IoFutureListener(){
            public void operationComplete(IoFuture future) {
                ConnectFuture connFuture = (ConnectFuture)future;
                if( connFuture.isConnected() ){
                    session = future.getSession();
                    try {
                        sendData();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    log.error("Not connected...exiting");
                }
            }
        });

我们来看一下addListener的源码:

/**
     * {@inheritDoc}
     */
    public IoFuture addListener(IoFutureListener<?> listener) {
        if (listener == null) {
            throw new IllegalArgumentException("listener");
        }

        synchronized (lock) {
            if (ready) {
                // Shortcut : if the operation has completed, no need to 
                // add a new listener, we just have to notify it. The existing
                // listeners have already been notified anyway, when the 
                // 'ready' flag has been set.
                notifyListener(listener);
            } else {
                if (firstListener == null) {
                    firstListener = listener;
                } else {
                    if (otherListeners == null) {
                        otherListeners = new ArrayList<IoFutureListener<?>>(1);
                    }
                    
                    otherListeners.add(listener);
                }
            }
        }
        
        return this;
    }

至此,MINA客户端的核心连接IoFuture的连接操作流程已经结束;
参考官网以及对应的源码包

Android端UDP代码示例

MINA客户端的UDP连接和TCP的连接在代码方面没有太大的区别,毕竟MINA实现方面使用的是统一的API接口,至于不同点,主要对于IoConnector的实现类不一样:

/**
     * 创建udp客户端的连接
     */
    public void createUdpConnect() throws InterruptedException {
        if (udpUoSession == null) {
            //首先创建对应的tcp连接
            udpIoConnector = new NioDatagramConnector();
            //设置超时时间
            udpIoConnector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
            //添加过滤器
            udpIoConnector.getFilterChain().addLast("logger", new LoggingFilter());//添加日志过滤器
            udpIoConnector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));//设置字节处理过滤器
            //添加IoHandler
            udpIoConnector.setHandler(new ClientSessionHandler());

            //通过ConnectFuture来连接服务器
            for (; ; ) {
                try {
                    ConnectFuture future = udpIoConnector.connect(new InetSocketAddress(HOSTNAME, PORT + 1));
                    future.awaitUninterruptibly();
                    udpUoSession = future.getSession();
                    break;//连接成功跳出死循环
                } catch (Exception e) {
                    System.err.println("Failed to connect");
                    e.printStackTrace();
                    Thread.sleep(5000);//如果连接失败,5秒后继续连接直到连接成功
                }
            }
        }
        udpUoSession.write("udp-ceshi");
    }

由代码我们可以发现,API的调用方式基本一致;
注:Android使用过程中,Socket连接不应该在主线程中实现,否则会报错

至此我们的MINA的ANDROID客户端开发大体的流程已经完成,后续会继续MINA各个单独模块进行使用以及源码分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容