基于Netty实现webSocket搭建消息推送


1 添加netty依赖

 implementation rootProject.ext.dependencies["netty"]    
"netty"   : "io.netty:netty-all:4.1.9.Final"

2 创建AIDL文件用于服务器推送消息回调

interface IPCInteract {
            /** 发起连接 传入service端推送数据时的接收类 */
           void connect(IPCServicePush iPCServicePush);

           /** 请求数据  */
           void reqData(String sub);

           /** 重连 */
           void doReConnect();

           /** 强制重连 */
           void forReConnect();
}




interface IPCServicePush {
          /** 服务端推送数据 */
           void servicePushData(String data);

           /** 通知客户端设置服务连接状态 */
           void serviceConnectStatus(boolean status);

           void sendPendingData();
}

3 创建处理WebStock请求的 ChannelHandler

** channelRead0方法中的msg即为服务器推送数据 **

  private static final String TAG = "WebSocketClientHandler";
    /**
     * 用于 WebSocket 的握手
     */
    private WebSocketClientHandshaker handShaker;
    private ChannelPromise channelPromise;

    public ChannelHandle() {
        try {
            Log.e(TAG, Config.scheme.get() + "://" + Config.address.get() + ":" + Config.port.get() + Config.path.get() + Config.token.get());
            final URI uri = new URI(Config.scheme.get() + "://" + Config.address.get() + ":" + Config.port.get() + Config.path.get() + "/" + Config.token.get());
            this.handShaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();

        if (!handShaker.isHandshakeComplete()) {
            try {
                handShaker.finishHandshake(ch, (FullHttpResponse) msg);
                channelPromise.setSuccess();
                Log.i(TAG, "WebSocket Client handShaker!" + Thread.currentThread());
            } catch (WebSocketHandshakeException e) {
                channelPromise.setFailure(e);
                Log.i(TAG, "WebSocket Client failed to handShaker");
            }
        }

        handleTextWebSocketFrame((TextWebSocketFrame) msg);
    }


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        channelPromise = ctx.newPromise();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        //进行http握手
        handShaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        Log.i(TAG, "channelInactive");
    }

    private void handleTextWebSocketFrame(TextWebSocketFrame frame) {
        NettyClient.getInstance().pushDataToClient(frame.text());
    }

4 心跳处理Handler 一段时间内未进行读写操作 触发 userEventTriggered

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    private static final String TAG = "HeartBeatHandler";

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent ev = (IdleStateEvent) evt;
            Logger.d(TAG, "userEventTriggered  ----> " + ev.state() + " address:" + ctx.channel().remoteAddress() + " / " + ctx.channel().isActive());

            NettyClient.getInstance().setClientConnectStatus(ctx.channel().isActive());
            switch (ev.state()) {
                case ALL_IDLE:
                    break;
                case READER_IDLE:
                    break;
                case WRITER_IDLE:
                    break;
            }
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        Log.d(TAG, "channelInactive " + ctx.channel().remoteAddress());
        if (NettyClient.getInstance().isConnectService()) {
            NettyClient.getInstance().notifyChannelInactive();
        }
        //设置服务器连接状态为false
        NettyClient.getInstance().setConnectService(false);
        NettyClient.getInstance().setClientConnectStatus(false);
        //当与服务器断开连接时 1s后重新连接
        ctx.channel().eventLoop().schedule(() -> {
            Logger.d("HeartBeatHandler  -----   与服务器断开连接,准备重新连接服务器");
            NettyClient.getInstance().doReConnect();
        }, 1000, TimeUnit.MILLISECONDS);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        Log.d(TAG, "channelActive " + ctx.channel().remoteAddress());
        //设置服务器连接状态为false
        NettyClient.getInstance().setConnectService(true);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Log.e(TAG, "exceptionCaught ---->" + cause.getMessage());
    }
}

5 单例NettyClient

public class NettyClient {

    private static final String TAG = "NettyClient";


    /**
     * 是否链接到服务器
     */
    private boolean connectService;
    private Bootstrap bootstrap;
    private ChannelFuture channelFuture;
    private ChannelFutureListener listener;
    private Channel channel;
    private final NioEventLoopGroup nioEventLoopGroup;


    private              IPCServicePush        ipcServicePush;


    private IPCInteract.Stub  mBinder =  new IPCInteract.Stub() {
        @Override
        public void connect(IPCServicePush iPCServicePush) throws RemoteException {
            ipcServicePush = iPCServicePush;
        }

        @Override
        public void reqData(String sub) throws RemoteException {
            sendDataToServer(sub);
        }

        @Override
        public void doReConnect() throws RemoteException {
            NettyClient.getInstance().doReConnect();
        }

        @Override
        public void forReConnect() throws RemoteException {

        }
    };


    public NettyClient() {

        //进行初始化 初始化线程组
        nioEventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class).group(nioEventLoopGroup);
        bootstrap.option(ChannelOption.TCP_NODELAY, true); //无阻塞
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //保持长连接
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
        bootstrap.handler(new LoggingHandler());
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        .addLast(new IdleStateHandler(0, 0, 10))
                        .addLast(new HttpClientCodec())  //HttpClientCodec将请求和应答消息编码或解码为HTTP消息
                        .addLast(new HttpObjectAggregator(8192))//通常接收到的http是一个片段,如果想要完整接受一次请求所有数据,我们需要绑定HttpObjectAggregator
                        .addLast(new WebSocketFrameAggregator(10 * 1048576))
                        .addLast(new ChannelHandle())//处理数据接收
                        .addLast(new HeartBeatHandler()); //检测心跳

            }
        });


        listener = future -> {
            if (future.isSuccess()) {
                Logger.d(TAG, "ChannelFutureListener:operationComplete status is success");
                future.channel().flush();
            } else {
                Logger.e(TAG, "ChannelFutureListener:operationComplete status is failed");
                //与服务器连接失败 1s后重现连接一次
                future.channel().eventLoop().schedule(() -> {
                    Logger.d("与服务器连接失败,准备重新连接服务器(listener)");
                    doReConnect();
                }, 1000, TimeUnit.MILLISECONDS);
            }
        };

    }


    /**
     * 发送数据
     */
    private void sendDataToServer(String sub) {
        if (getChannel() != null && getChannel().isActive()) {
            getChannel().writeAndFlush(new TextWebSocketFrame(sub));
        }
    }



    /**
     * 连接服务
     */
    public void connect(final String intentHost, final int intentPort) {

        new Thread(() -> {
            try {
                Logger.w(TAG, "开始连接服务器,服务器地址:" + Config.address.get() + ":" + Config.port.get());
                channelFuture = bootstrap.connect(intentHost, intentPort).sync();//sync 通过同步方法阻塞直到连接服务器完成
                channelFuture.addListener(listener);
                channel = channelFuture.channel();
            } catch (Exception e) {
                Logger.d(TAG, "connect Exception -----> \n" + e);

                ToastUtil.toast(e.toString());

                e.printStackTrace();
            } finally {
            }
        }).start();
    }


    /**
     * 重新链接
     */
    public synchronized void doReConnect() {
        //判断是否断开连接,如果服务还连接,就跳过重连操作
        if (connectService) {
            return;
        }

        new Thread(() -> {
            try {
                if (channel != null) {
                    channel.close();
                }
                Logger.w(TAG, "开始重新连接服务器,服务器地址:" + Config.address.get() + ":" + Config.port.get());
                channelFuture = bootstrap.connect(Config.address.get(), Config.port.get()).sync();//sync 通过同步方法阻塞直到连接服务器完成
                channelFuture.addListener(listener);
                channel = channelFuture.channel();
            } catch (Exception e) {
                Log.d(TAG, "connect Exception -----> \n" + e);
                e.printStackTrace();

            } finally {


            }
        }).start();
    }


    /** 推送数据到client */
    public void pushDataToClient(String data) {
        try {
            ipcServicePush.servicePushData(data);
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }



    /**
     * 提示前台服务断开链接
     */
    public void notifyChannelInactive() {

    }


    /** 通知客户端设置连接状态 */
    public void setClientConnectStatus(boolean status) {
        //        Log.e(TAG,"setClientConnectStatus "+ ipcServicePush);
        if (ipcServicePush != null) {
            try {
                ipcServicePush.serviceConnectStatus(status);
            } catch (RemoteException e) {
                e.printStackTrace();
            }
        }
    }



    /** 关闭服务 */
    public void close() {
        if (channel != null) {
            channel.close();
        }
    }


    /**
     * 是否链接到服务器
     */
    public boolean isConnectService() {
        return connectService;
    }


    public void setConnectService(boolean connectService) {
        this.connectService = connectService;
    }


    public Channel getChannel() {
        return channel;
    }


    public IPCInteract.Stub getMBinder() {
        return mBinder;
    }

    public static NettyClient getInstance() {
        return NettyInstance.instance;
    }

    private static class NettyInstance {
        private static NettyClient instance = new NettyClient();
    }




}

6 封装ipc服务的方法

public class IPCUtil {


   private static final String TAG = "IPCUtil";

   private Gson gson = new Gson();


   /**
    * 是否已经绑定服务
    */
   private boolean isBound = false;

   /**
    * 服务是否连接
    */
   private boolean connect;

   /**
    * 处理服务端推送过来的数据的处理类
    */
   private IPCHandler ipcHandler;

   /**
    * 与远程服务交互的ipc对象
    */
   private IPCInteract ipcInteract;


   /**
    * 绑定服务时需要用到的回调类
    */
   private ServiceConnection connection = new ServiceConnection() {
       @Override
       public void onServiceConnected(ComponentName name, IBinder service) {
           Log.d(TAG, "ServiceConnection  / " + Thread.currentThread() + "/" + System.currentTimeMillis());
           isBound = true;
           ipcInteract = IPCInteract.Stub.asInterface(service);
           initBinder();
       }

       @Override
       public void onServiceDisconnected(ComponentName name) {
           isBound = false;
       }
   };


   /**
    * 绑定服务端后初始化
    */
   private void initBinder() {
       try {
           ipcInteract.connect(ipcServicePush);
       } catch (RemoteException e) {
           e.printStackTrace();
       }
   }


   /**
    * 调度线程用 ipc传递过来的数据并非在ui线程中
    */
   @SuppressLint("HandlerLeak")
   private Handler mHandler = new Handler() {
       @Override
       public void handleMessage(Message msg) {
           handlerRecData((String) msg.obj);
       }
   };


   /**
    * service推送数据时 client接收类
    */
   private IPCServicePush ipcServicePush = new IPCServicePush.Stub() {
       @Override
       public void servicePushData(String data) throws RemoteException {
           //注意此时处理的线程是Binder线程,并不是ui线程,所以要用Handler进行线程切换
           Message message = Message.obtain();
           message.obj = data;
           mHandler.sendMessage(message);
       }

       @Override
       public void serviceConnectStatus(boolean status) throws RemoteException {
           connect = status;
       }

       @Override
       public void sendPendingData() throws RemoteException {

       }
   };


   private void handlerRecData(String data) {

       if (ipcHandler == null || TextUtils.isEmpty(data)) {
           Logger.e(TAG, "IPCUtil  ------ ipcHandler or serviceRecData cannot be empty");
           return;
       }

       ResponseBodyRec<BasePointRec> listDataRec = gson.fromJson(data, new TypeToken<ResponseBodyRec<BasePointRec>>() {
       }.getType());


       ipcHandler.onPointStatusReceive(listDataRec);
   }


   //******************************* 对外开放的方法 **************************************/

   /**
    * 服务已解绑
    */
   public void unBound() {
       isBound = false;
   }


   /**
    * 向服务端请求数据
    *
    * @param sub 请求参数类
    */
   public void reqData(Object sub) {
       try {
           if (ipcInteract != null || checkConnectAndDoReConnect()) {
               ipcInteract.reqData((String) sub);
           }
       } catch (RemoteException e) {
           e.printStackTrace();
       }
   }

   /**
    * 断网重连
    */
   public void doReConnect() {
       if (ipcInteract == null) {
           return;
       }
       try {
           ipcInteract.doReConnect();
       } catch (RemoteException e) {
           e.printStackTrace();
       }
   }


   /**
    * 判断是否连接服务器 false就重新连接
    */
   private boolean checkConnectAndDoReConnect() {
       if (!connect) {
           Log.e(TAG, "connect is broken");
           doReConnect();
           return false;
       }
       return true;
   }


   public void switchIPCHandler(IPCHandler ipcHandler) {
       this.ipcHandler = ipcHandler;
   }


   //********************************** get方法 ****************************************/


   public boolean isBound() {
       return isBound;
   }


   public ServiceConnection getConnection() {
       return connection;
   }


   //*********************************** 单例化 ****************************************/

   public static IPCUtil getInstance() {
       return IPCUtilInstance.instance;
   }

   private static class IPCUtilInstance {
       private static IPCUtil instance = new IPCUtil();
   }


}

7 创建服务

public class NettyService extends Service {
   private static final String TAG = "NettyService";
   //service端推送数据时的 client接收类

   @Override
   public int onStartCommand(Intent intent, int flags, int startId) {
       Log.d(TAG, "onStartCommand  / " + Thread.currentThread() + "/" + System.currentTimeMillis() + "/flags is " + flags);
       if (intent != null) {

           String token = (String) SharedInfo.getInstance().getValue(Constant.TOKEN, "");

           Config.address.set(intent.getStringExtra(BundleKeys.WS_HOST));
           Config.port.set(intent.getIntExtra(BundleKeys.WS_PORT, BaseParams.WS_PORT));
           Config.scheme.set(intent.getStringExtra(BundleKeys.WS_SCHEME));
           Config.path.set(intent.getStringExtra(BundleKeys.WS_PATH));
           Config.token.set(token);

           NettyClient.getInstance().connect(Config.address.get(), Config.port.get());
       }
       return START_STICKY;
   }

   @Nullable
   @Override
   public IBinder onBind(Intent intent) {
       return NettyClient.getInstance().getMBinder();
   }

   @Override
   public boolean onUnbind(Intent intent) {
       Log.d(TAG, "unbindService");
       NettyClient.getInstance().close();
       return super.onUnbind(intent);
   }

   @Override
   public void onDestroy() {
       Log.d(TAG, "onDestroy");
       NettyClient.getInstance().close();
       super.onDestroy();
   }
}

8 Activity中绑定服务 以及指定IpcHandler对象

在Activity销毁时需要注意注销服务以及断开Stocket连接 否则会导致oom


@Route(path = RouterUrl.MAIN, extras = RouterExtras.EXTRA_COMMON)
public class MainAct extends BaseActivity {

    private ActMainBinding mainBinding;
    private MainCtrl mainCtrl;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        mainBinding = DataBindingUtil.setContentView(this, R.layout.act_main);
        mainCtrl = new MainCtrl(mainBinding);
        mainBinding.setViewCtrl(mainCtrl);

        startService();

    }

    @Override
    protected void onStart() {
        super.onStart();
        bindService();
    }


    @Override
    protected void onResume() {
        super.onResume();
        IPCUtil.getInstance().switchIPCHandler(mainCtrl.getIpcHandler());
    }



    @Override
    protected void onDestroy() {
        if (IPCUtil.getInstance().isBound()) {
            unbindService(IPCUtil.getInstance().getConnection());
            stopService(new Intent(this, NettyService.class));
            IPCUtil.getInstance().unBound();
        }
        super.onDestroy();
    }




    /**
     * 启动服务
     */
    private void startService() {
        Intent intent = new Intent(this, NettyService.class);
        intent.putExtra(BundleKeys.WS_SCHEME, BaseParams.WS_SCHEME);
        intent.putExtra(BundleKeys.WS_HOST, BaseParams.WS_HOST);
        intent.putExtra(BundleKeys.WS_PORT,BaseParams.WS_PORT);
        intent.putExtra(BundleKeys.WS_PATH, BaseParams.WS_PATH);
        intent.putExtra(BundleKeys.WS_TOKEN, BaseParams.WS_TOKEN);
        startService(intent);
    }

    /**
     * 绑定服务
     */
    private void bindService() {
        Intent intent = new Intent(this, NettyService.class);
        bindService(intent, IPCUtil.getInstance().getConnection(), BIND_AUTO_CREATE);
    }


    @Override
    public void onBackPressed() {
        ActivityManage.onExit();
    }

}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容