安卓通信之NIO(Socket长连接)

本文中我们将讲解一下App的长连接实现。一般而言长连接已经是App的标配了,推送功能的实现基础就是长连接,当然了我们也可以通过轮训操作实现推送功能,但是轮训一般及时性比较差,而且网络消耗与电量销毁比较多,因此一般推送功能都是通过长连接实现的。

那么如何实现长连接呢?现在一般有这么几种实现方式:

  • 使用第三方的长连接服务;

  • 通过NIO等方案实现长连接服务;

  • 通过MINA等第三方框架实现长连接;

几种长连接服务的具体实现,以及各自的优缺点。

1. 使用第三方的长连接服务

介绍:这是最简单的方式,我们可以通过接入极光推送,百度推送,友盟等第三方服务实现长连接,通过接入第三方的API我们可以很方便的接入第三方的长连接,推送服务,但是这种方式定制化程度不太好,如果对长连接服务不是要求特别高,对定制化要求不是很高的话基本可以考虑这种方式(目前主流的App都是使用第三方的长连接服务)
优势:简单,方便
劣势:定制化程度不高

2. 使用NIO等方案实现长连接服务

介绍:通过NIO的方式实现长连接,这种方式对技术要求程度比较高,基本都是通过java API实现长连接,实现心跳包,实现异常情况的容错等操作,可以说通过NIO实现长连接对技术要求很高,一般如果没有成行的技术方案比建议这么做,就算实现了长连接,后期连接的维护,对电量,流量的损耗等都需要持续的优化。
优势:定制化比较高
劣势:技术要求高,需要持续的维护

3. 使用MINA等第三方框架实现长连接

介绍:MINA是一个第三方的NIO框架,该框架实现了一整套的长连接机制,包括长连接的建立,心跳包的实现,异常机制的容错等。使用MINA实现长连接可以定制化的实现一些特有的功能,并且比NIO方案较为简单,因为其已经封装了一些长连接的特有机制,比如心跳包,容错等。
优势:可定制,较NIO方法简单
劣势:也需要一定的技术储备

长连接具体实现

在我们的Android客户端中长连接的实现机制采用–MINA方式。这里多说一句,一开始的长连接采用的是NIO方案,但是采用这种方案之后踩了很多坑,包括心跳,容错等机制都是自己写的,所以耗费了大量的时间,而且对手机电量的消耗很大,最后决定使用MINA NIO框架重新实现一遍长连接,后来经过实测,长连接的稳定性还有耗电量,流量的消耗等指标方面有了很大的提高。

下面我将简单的介绍一下通过NIO实现长连接的具体流程:

  • 引入MINA jar包,在App启动页面,登录页面启动长连接;

  • 创建后台服务,在服务中创建MINA长连接;

  • 实现心跳包,重写一些容错机制;

  • 实现长连接断了之后的重连机制,并且重连次数有限制不能一直重连;

  • 长连接断了之后实现轮训操作,这里的轮训服务只有在长连接断了之后才启动,在长连接恢复之后关闭;

以下就是在长连接中实现的具体代码:

  • 在Application的onCreate方法中检测App是否登录,若登录的话启动长连接
/**
 * 在Application的onCreate方法中执行启动长连接的操作
 **/
@Override
    public void onCreate() {
        ...
        // 登录后开启长连接
        if (UserConfig.isPassLogined()) {
            L.i("用户已登录,开启长连接...");
            startLongConn();
        }
        ...
    }
  • 通过闹钟服务实现具体的启动长连接service的操作,即每隔60秒钟判断长连接是否启动,若未启动则实现启动操作
    /**
     * 开始执行启动长连接服务
     */
    public void startLongConn() {
        quitLongConn();
        L.i("长连接服务已开启");
        AlarmManager manager = (AlarmManager) getSystemService(Context.ALARM_SERVICE);
        Intent intent = new Intent(this, LongConnService.class);
        intent.setAction(LongConnService.ACTION);
        PendingIntent pendingIntent = PendingIntent.getService(this, 0, intent, PendingIntent.FLAG_UPDATE_CURRENT);
        long triggerAtTime = SystemClock.elapsedRealtime();
        manager.setRepeating(AlarmManager.RTC_WAKEUP, triggerAtTime, 60 * 1000, pendingIntent);
    }
/**
 * 后台长连接服务
 **/
public class LongConnService extends Service {
    public static String ACTION = "com.youyou.uuelectric.renter.Service.LongConnService";
    private static MinaLongConnectManager minaLongConnectManager;
    public String tag = "LongConnService";
    private Context context;

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        context = getApplicationContext();

        startLongConnect();
        ObserverManager.addObserver("LongConnService", stopListener);
        return START_STICKY;
    }

    public ObserverListener stopListener = new ObserverListener() {
        @Override
        public void observer(String from, Object obj) {
            closeConnect();
        }
    };

    @Override
    public void onDestroy() {
        super.onDestroy();
        closeConnect();
    }

    /**
     * 开始执行启动长连接的操作
     */
    private void startLongConnect() {
        if (Config.isNetworkConnected(context)) {
            if (minaLongConnectManager != null && minaLongConnectManager.checkConnectStatus()) {
                L.i("长连接状态正常...");
                return;
            }
            if (minaLongConnectManager == null) {
                startThreadCreateConnect();
            } else {
                if (minaLongConnectManager.connectIsNull() && minaLongConnectManager.isNeedRestart()) {
                    L.i("session已关闭,需要重新创建一个session");
                    minaLongConnectManager.startConnect();
                } else {
                    L.i("长连接已关闭,需要重开一个线程来重新创建长连接");
                    startThreadCreateConnect();
                }
            }
        }

    }

    private final AtomicInteger mCount = new AtomicInteger(1);

    private void startThreadCreateConnect() {
        if (UserConfig.getUserInfo().getB3Key() != null && UserConfig.getUserInfo().getSessionKey() != null) {
            System.gc();

            new Thread(new Runnable() {
                @Override
                public void run() {

                    minaLongConnectManager = MinaLongConnectManager.getInstance(context);
                    minaLongConnectManager.crateLongConnect();
                }
            }, "longConnectThread" + mCount.getAndIncrement()).start();
        }
    }

    private void closeConnect() {

        if (minaLongConnectManager != null) {
            minaLongConnectManager.closeConnect();
        }
        minaLongConnectManager = null;

        stopSelf();
    }

    @Override
    public IBinder onBind(Intent intent) {
        throw new UnsupportedOperationException("Not yet implemented");
    }
}
  • 而下面的代码就是长连接的具体实现操作,具体的代码有相关注释说明
/**
 * 具体实现长连接的管理对象
 **/
public class MinaLongConnectManager {

    private static final String TAG = MinaLongConnectManager.class.getSimpleName();
    /**
     * 服务器端口号
     */
    public static final int DEFAULT_PORT = 18156;
    /**
     * 连接超时时间,30 seconds
     */
    public static final long SOCKET_CONNECT_TIMEOUT = 30 * 1000L;

    /**
     * 长连接心跳包发送频率,60s
     */
    public static final int KEEP_ALIVE_TIME_INTERVAL = 60;
    private static Context context;
    private static MinaLongConnectManager minaLongConnectManager;

    private static NioSocketConnector connector;
    private static ConnectFuture connectFuture;
    public static IoSession session;
    private static ExecutorService executorService = Executors.newSingleThreadExecutor();

    /**
     * 长连接是否正在连接中...
     */
    private static boolean isConnecting = false;

    private MinaLongConnectManager() {
        EventBus.getDefault().register(this);
    }

    public static synchronized MinaLongConnectManager getInstance(Context ctx) {

        if (minaLongConnectManager == null) {
            context = ctx;
            minaLongConnectManager = new MinaLongConnectManager();
        }
        return minaLongConnectManager;
    }

    /**
     * 检查长连接的各种对象状态是否正常,正常情况下无需再创建
     *
     * @return
     */
    public boolean checkConnectStatus() {
        if (connector != null && connector.isActive() && connectFuture != null && connectFuture.isConnected() && session != null && session.isConnected()) {
            return true;
        } else {
            return false;
        }
    }

    public boolean connectIsNull() {
        return connector != null;
    }

    /**
     * 创建长连接,配置过滤器链和心跳工厂
     */
    public synchronized void crateLongConnect() {

        if (isConnecting) {
            L.i("长连接正在创建中...");
            return;
        }
        if (!Config.isNetworkConnected(context)) {
            L.i("检测到网络未打开,无法正常启动长连接,直接return...");
            return;
        }

        if (checkConnectStatus()) {
            return;
        }
        isConnecting = true;
        try {
            connector = new NioSocketConnector();
            connector.setConnectTimeoutMillis(SOCKET_CONNECT_TIMEOUT);

            if (L.isDebug) {
                if (!connector.getFilterChain().contains("logger")) {

                    connector.getFilterChain().addLast("logger", new LoggingFilter());
                }
            }
            if (!connector.getFilterChain().contains("codec")) {

                connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new LongConnectProtocolFactory()));
            }

            ClientKeepAliveMessageFactory heartBeatFactory = new ClientKeepAliveMessageFactory();

            KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.READER_IDLE);

            heartBeat.setForwardEvent(true);

            heartBeat.setRequestTimeoutHandler(KeepAliveRequestTimeoutHandler.LOG);

            heartBeat.setRequestInterval(KEEP_ALIVE_TIME_INTERVAL);
            if (!connector.getFilterChain().contains("keepAlive")) {
                connector.getFilterChain().addLast("keepAlive", heartBeat);
            }
            if (!connector.getFilterChain().contains("reconnect")) {

                connector.getFilterChain().addLast("reconnect", new LongConnectReconnectionFilter());
            }

            connector.getSessionConfig().setReceiveBufferSize(1024);
            connector.getSessionConfig().setSendBufferSize(1024);

            connector.getSessionConfig().setReaderIdleTime(60);

            LongConnectHandler longConnectHandler = new LongConnectHandler(this, context);
            connector.setHandler(longConnectHandler);

        } catch (Exception e) {
            e.printStackTrace();
            closeConnect();
        }

        startConnect();
    }

    /**
     * 开始或重连长连接
     */
    public synchronized void startConnect() {
        if (connector != null) {
            L.i("开始创建长连接...");
            boolean isSuccess = beginConnect();

            if (isSuccess) {
                isNeedRestart = false;
                if (context != null) {

                    LoopRequest.getInstance(context).sendLoopRequest();
                }
            } else {

                startLoopService();
            }
            isConnecting = false;

        } else {
            L.i("connector已为null,不能执行创建连接动作...");
        }
    }

    /**
     * 检测MINA中线程池的活动状态
     */
    private void printProcessorExecutor() {
        Class connectorClass = connector.getClass().getSuperclass();
        try {
            L.i("connectorClass:" + connectorClass.getCanonicalName());
            Field field = connectorClass.getDeclaredField("processor");
            field.setAccessible(true);
            Object connectorObject = field.get(connector);
            if (connectorObject != null) {
                SimpleIoProcessorPool processorPool = (SimpleIoProcessorPool) connectorObject;
                Class processPoolClass = processorPool.getClass();
                Field executorField = processPoolClass.getDeclaredField("executor");
                executorField.setAccessible(true);
                Object executorObject = executorField.get(processorPool);
                if (executorObject != null) {
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorObject;
                    L.i("线程池中当前线程数:" + threadPoolExecutor.getPoolSize() + "\t 核心线程数:" + threadPoolExecutor.getCorePoolSize() + "\t 最大线程数:" + threadPoolExecutor.getMaximumPoolSize());
                }

            } else {
                L.i("connectorObject = null");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 开始创建Session
     *
     * @return
     */
    public boolean beginConnect() {

        if (session != null) {
            session.close(false);
            session = null;
        }
        if (connectFuture != null && connectFuture.isConnected()) {
            connectFuture.cancel();
            connectFuture = null;
        }
        FutureTask futureTask = new FutureTask<>(new Callable() {
            @Override
            public Boolean call() {
                try {
                    InetSocketAddress address = new InetSocketAddress(NetworkTask.getBASEURL(), DEFAULT_PORT);
                    connectFuture = connector.connect(address);
                    connectFuture.awaitUninterruptibly(3000L);
                    session = connectFuture.getSession();
                    if (session == null) {
                        L.i(TAG + "连接创建失败...当前环境:" + NetworkTask.getBASEURL());
                        return false;
                    } else {
                        L.i(TAG + "长连接已启动,连接已成功...当前环境:" + NetworkTask.getBASEURL());
                        return true;
                    }
                } catch (Exception e) {
                    return false;
                }
            }
        });

        executorService.submit(futureTask);
        try {
            return futureTask.get();
        } catch (Exception e) {
            return false;
        }

    }

    /**
     * 关闭连接,根据传入的参数设置session是否需要重新连接
     */
    public synchronized void closeConnect() {
        if (session != null) {
            session.close(false);
            session = null;
        }
        if (connectFuture != null && connectFuture.isConnected()) {
            connectFuture.cancel();
            connectFuture = null;
        }
        if (connector != null && !connector.isDisposed()) {

            connector.getFilterChain().clear();
            connector.dispose();
            connector = null;
        }
        isConnecting = false;
        L.i("长连接已关闭...");
    }

    private volatile boolean isNeedRestart = false;

    public boolean isNeedRestart() {
        return isNeedRestart;
    }

    public void onEventMainThread(BaseEvent event) {
        if (event == null || TextUtils.isEmpty(event.getType()))
            return;
        if (EventBusConstant.EVENT_TYPE_NETWORK_STATUS.equals(event.getType())) {
            String status = (String) event.getExtraData();

            if (status != null && status.equals("open")) {
                if (isNeedRestart && UserConfig.getUserInfo().getB3Key() != null && UserConfig.getUserInfo().getSessionKey() != null) {
                    L.i("检测到网络已打开且长连接处于关闭状态,需要启动长连接...");
                    Intent intent = new Intent(context, LongConnService.class);
                    intent.setAction(LongConnService.ACTION);
                    context.startService(intent);
                }
            }
        }
    }

    /**
     * 出现异常、session关闭后,接收事件进行长连接重连操作
     */
    public void onEventMainThread(LongConnectMessageEvent event) {

        if (event.getType() == LongConnectMessageEvent.TYPE_RESTART) {

            long currentTime = System.currentTimeMillis();

            if (UserConfig.getUserInfo().getB3Key() != null && UserConfig.getUserInfo().getSessionKey() != null
                    && ((currentTime / 1000) < UserConfig.getUserInfo().getUnvalidSecs())) {

                SystemClock.sleep(1000);
                if (Config.isNetworkConnected(context)) {
                    L.i("出现异常情况,需要自动重连长连接...");
                    startConnect();
                } else {
                    isNeedRestart = true;
                    L.i("长连接出现异常,需要重新创建session会话...");
                }
            }
        } else if (event.getType() == LongConnectMessageEvent.TYPE_CLOSE) {
            L.i("收到session多次close的消息,此时需要关闭长连接,等待下次闹钟服务来启动...");
            closeConnect();
        }
    }

    /**
     * 启动轮询服务
     */
    public void startLoopService() {

        if (!LoopService.isServiceRuning) {

            if (UserConfig.isPassLogined()) {

                if (MinaLongConnectManager.session != null && MinaLongConnectManager.session.isConnected()) {
                    LoopService.quitLoopService(context);
                    return;
                }
                LoopService.startLoopService(context);
            } else {
                LoopService.quitLoopService(context);
            }
        }
    }

}

以上是通过NIO实现App长连接的部分核心代码,NIO中其实已经实现了长连接的核心流程,我们需要做的就是按照其流程实现长连接,需要注意的是要处理好异常情况,重连机制等。

当长连接创建成功之后需要重新拉取一次服务器端的长连接消息,并且这里的长连接做了容错处理,当长连接断了之后需要有重连机制,一直启动轮训服务,当长连接修复之后轮训服务退出。以上只是通过MINA框架实现的长连接操作的核心流程,还有一些长连接实现的操作细节这里就不做过多的说明。

总结:
基本上对于App来说长连接已经是标配了,产品开发人员可以根据具体的产品需求选择不同的实现方式,一般而言使用第三方的推送服务已经可以满足大部分的需求了,当然了若是相对技术有所追求的话也可以选择自己实现一套长连接服务,不过其中可能存在一些坑需要填,希望这里的长连接实现能够对大家对长连接实现上有所帮助。

  • 可以通过使用第三方长连接服务或者是自己实现连接的方式;

  • 自定义实现长连接可以通过使用NIO或者是第三方NIO框架,比如MINA实现;

  • 长连接实现中通过心跳包的机制实现App与服务器的长时间连接;

  • 可以通过闹钟的机制定时检测长连接服务是否可靠,长连接是否出现异常等;

  • 为了消息的及时性,在长连接出现异常情况时可通过创建轮训服务的机制实现对消息的获取,待长连接恢复之后关闭轮训服务;

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,451评论 25 707
  • 我做了一个梦,梦里的我站在路边瑟瑟发抖,好像在等谁,红着眼睛。 我做了第二个梦,车里的人好像很生气,冲我喊着什么,...
    岛咿阅读 237评论 0 0
  • 前几天,看到假装勤奋这个说法,大意是讲你看起来很勤奋,起早贪黑、通宵达旦,但并非真的勤奋,从未认真思考过为什么做得...
    两岸猿声阅读 160评论 0 2
  • 讲真,这篇文其实是带着科普和些许小情绪的成分完成的。 从去年开始,我的微博上频繁地有小伙伴留言:你用的什么画具...
    BrownBella阅读 1,915评论 13 48