1 添加netty依赖
implementation rootProject.ext.dependencies["netty"]
"netty" : "io.netty:netty-all:4.1.9.Final"
2 创建AIDL文件用于服务器推送消息回调
interface IPCInteract {
/** 发起连接 传入service端推送数据时的接收类 */
void connect(IPCServicePush iPCServicePush);
/** 请求数据 */
void reqData(String sub);
/** 重连 */
void doReConnect();
/** 强制重连 */
void forReConnect();
}
interface IPCServicePush {
/** 服务端推送数据 */
void servicePushData(String data);
/** 通知客户端设置服务连接状态 */
void serviceConnectStatus(boolean status);
void sendPendingData();
}
3 创建处理WebStock请求的 ChannelHandler
** channelRead0方法中的msg即为服务器推送数据 **
private static final String TAG = "WebSocketClientHandler";
/**
* 用于 WebSocket 的握手
*/
private WebSocketClientHandshaker handShaker;
private ChannelPromise channelPromise;
public ChannelHandle() {
try {
Log.e(TAG, Config.scheme.get() + "://" + Config.address.get() + ":" + Config.port.get() + Config.path.get() + Config.token.get());
final URI uri = new URI(Config.scheme.get() + "://" + Config.address.get() + ":" + Config.port.get() + Config.path.get() + "/" + Config.token.get());
this.handShaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handShaker.isHandshakeComplete()) {
try {
handShaker.finishHandshake(ch, (FullHttpResponse) msg);
channelPromise.setSuccess();
Log.i(TAG, "WebSocket Client handShaker!" + Thread.currentThread());
} catch (WebSocketHandshakeException e) {
channelPromise.setFailure(e);
Log.i(TAG, "WebSocket Client failed to handShaker");
}
}
handleTextWebSocketFrame((TextWebSocketFrame) msg);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channelPromise = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
//进行http握手
handShaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Log.i(TAG, "channelInactive");
}
private void handleTextWebSocketFrame(TextWebSocketFrame frame) {
NettyClient.getInstance().pushDataToClient(frame.text());
}
4 心跳处理Handler 一段时间内未进行读写操作 触发 userEventTriggered
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private static final String TAG = "HeartBeatHandler";
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent ev = (IdleStateEvent) evt;
Logger.d(TAG, "userEventTriggered ----> " + ev.state() + " address:" + ctx.channel().remoteAddress() + " / " + ctx.channel().isActive());
NettyClient.getInstance().setClientConnectStatus(ctx.channel().isActive());
switch (ev.state()) {
case ALL_IDLE:
break;
case READER_IDLE:
break;
case WRITER_IDLE:
break;
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Log.d(TAG, "channelInactive " + ctx.channel().remoteAddress());
if (NettyClient.getInstance().isConnectService()) {
NettyClient.getInstance().notifyChannelInactive();
}
//设置服务器连接状态为false
NettyClient.getInstance().setConnectService(false);
NettyClient.getInstance().setClientConnectStatus(false);
//当与服务器断开连接时 1s后重新连接
ctx.channel().eventLoop().schedule(() -> {
Logger.d("HeartBeatHandler ----- 与服务器断开连接,准备重新连接服务器");
NettyClient.getInstance().doReConnect();
}, 1000, TimeUnit.MILLISECONDS);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
Log.d(TAG, "channelActive " + ctx.channel().remoteAddress());
//设置服务器连接状态为false
NettyClient.getInstance().setConnectService(true);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Log.e(TAG, "exceptionCaught ---->" + cause.getMessage());
}
}
5 单例NettyClient
public class NettyClient {
private static final String TAG = "NettyClient";
/**
* 是否链接到服务器
*/
private boolean connectService;
private Bootstrap bootstrap;
private ChannelFuture channelFuture;
private ChannelFutureListener listener;
private Channel channel;
private final NioEventLoopGroup nioEventLoopGroup;
private IPCServicePush ipcServicePush;
private IPCInteract.Stub mBinder = new IPCInteract.Stub() {
@Override
public void connect(IPCServicePush iPCServicePush) throws RemoteException {
ipcServicePush = iPCServicePush;
}
@Override
public void reqData(String sub) throws RemoteException {
sendDataToServer(sub);
}
@Override
public void doReConnect() throws RemoteException {
NettyClient.getInstance().doReConnect();
}
@Override
public void forReConnect() throws RemoteException {
}
};
public NettyClient() {
//进行初始化 初始化线程组
nioEventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class).group(nioEventLoopGroup);
bootstrap.option(ChannelOption.TCP_NODELAY, true); //无阻塞
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //保持长连接
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
bootstrap.handler(new LoggingHandler());
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(0, 0, 10))
.addLast(new HttpClientCodec()) //HttpClientCodec将请求和应答消息编码或解码为HTTP消息
.addLast(new HttpObjectAggregator(8192))//通常接收到的http是一个片段,如果想要完整接受一次请求所有数据,我们需要绑定HttpObjectAggregator
.addLast(new WebSocketFrameAggregator(10 * 1048576))
.addLast(new ChannelHandle())//处理数据接收
.addLast(new HeartBeatHandler()); //检测心跳
}
});
listener = future -> {
if (future.isSuccess()) {
Logger.d(TAG, "ChannelFutureListener:operationComplete status is success");
future.channel().flush();
} else {
Logger.e(TAG, "ChannelFutureListener:operationComplete status is failed");
//与服务器连接失败 1s后重现连接一次
future.channel().eventLoop().schedule(() -> {
Logger.d("与服务器连接失败,准备重新连接服务器(listener)");
doReConnect();
}, 1000, TimeUnit.MILLISECONDS);
}
};
}
/**
* 发送数据
*/
private void sendDataToServer(String sub) {
if (getChannel() != null && getChannel().isActive()) {
getChannel().writeAndFlush(new TextWebSocketFrame(sub));
}
}
/**
* 连接服务
*/
public void connect(final String intentHost, final int intentPort) {
new Thread(() -> {
try {
Logger.w(TAG, "开始连接服务器,服务器地址:" + Config.address.get() + ":" + Config.port.get());
channelFuture = bootstrap.connect(intentHost, intentPort).sync();//sync 通过同步方法阻塞直到连接服务器完成
channelFuture.addListener(listener);
channel = channelFuture.channel();
} catch (Exception e) {
Logger.d(TAG, "connect Exception -----> \n" + e);
ToastUtil.toast(e.toString());
e.printStackTrace();
} finally {
}
}).start();
}
/**
* 重新链接
*/
public synchronized void doReConnect() {
//判断是否断开连接,如果服务还连接,就跳过重连操作
if (connectService) {
return;
}
new Thread(() -> {
try {
if (channel != null) {
channel.close();
}
Logger.w(TAG, "开始重新连接服务器,服务器地址:" + Config.address.get() + ":" + Config.port.get());
channelFuture = bootstrap.connect(Config.address.get(), Config.port.get()).sync();//sync 通过同步方法阻塞直到连接服务器完成
channelFuture.addListener(listener);
channel = channelFuture.channel();
} catch (Exception e) {
Log.d(TAG, "connect Exception -----> \n" + e);
e.printStackTrace();
} finally {
}
}).start();
}
/** 推送数据到client */
public void pushDataToClient(String data) {
try {
ipcServicePush.servicePushData(data);
} catch (RemoteException e) {
e.printStackTrace();
}
}
/**
* 提示前台服务断开链接
*/
public void notifyChannelInactive() {
}
/** 通知客户端设置连接状态 */
public void setClientConnectStatus(boolean status) {
// Log.e(TAG,"setClientConnectStatus "+ ipcServicePush);
if (ipcServicePush != null) {
try {
ipcServicePush.serviceConnectStatus(status);
} catch (RemoteException e) {
e.printStackTrace();
}
}
}
/** 关闭服务 */
public void close() {
if (channel != null) {
channel.close();
}
}
/**
* 是否链接到服务器
*/
public boolean isConnectService() {
return connectService;
}
public void setConnectService(boolean connectService) {
this.connectService = connectService;
}
public Channel getChannel() {
return channel;
}
public IPCInteract.Stub getMBinder() {
return mBinder;
}
public static NettyClient getInstance() {
return NettyInstance.instance;
}
private static class NettyInstance {
private static NettyClient instance = new NettyClient();
}
}
6 封装ipc服务的方法
public class IPCUtil {
private static final String TAG = "IPCUtil";
private Gson gson = new Gson();
/**
* 是否已经绑定服务
*/
private boolean isBound = false;
/**
* 服务是否连接
*/
private boolean connect;
/**
* 处理服务端推送过来的数据的处理类
*/
private IPCHandler ipcHandler;
/**
* 与远程服务交互的ipc对象
*/
private IPCInteract ipcInteract;
/**
* 绑定服务时需要用到的回调类
*/
private ServiceConnection connection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
Log.d(TAG, "ServiceConnection / " + Thread.currentThread() + "/" + System.currentTimeMillis());
isBound = true;
ipcInteract = IPCInteract.Stub.asInterface(service);
initBinder();
}
@Override
public void onServiceDisconnected(ComponentName name) {
isBound = false;
}
};
/**
* 绑定服务端后初始化
*/
private void initBinder() {
try {
ipcInteract.connect(ipcServicePush);
} catch (RemoteException e) {
e.printStackTrace();
}
}
/**
* 调度线程用 ipc传递过来的数据并非在ui线程中
*/
@SuppressLint("HandlerLeak")
private Handler mHandler = new Handler() {
@Override
public void handleMessage(Message msg) {
handlerRecData((String) msg.obj);
}
};
/**
* service推送数据时 client接收类
*/
private IPCServicePush ipcServicePush = new IPCServicePush.Stub() {
@Override
public void servicePushData(String data) throws RemoteException {
//注意此时处理的线程是Binder线程,并不是ui线程,所以要用Handler进行线程切换
Message message = Message.obtain();
message.obj = data;
mHandler.sendMessage(message);
}
@Override
public void serviceConnectStatus(boolean status) throws RemoteException {
connect = status;
}
@Override
public void sendPendingData() throws RemoteException {
}
};
private void handlerRecData(String data) {
if (ipcHandler == null || TextUtils.isEmpty(data)) {
Logger.e(TAG, "IPCUtil ------ ipcHandler or serviceRecData cannot be empty");
return;
}
ResponseBodyRec<BasePointRec> listDataRec = gson.fromJson(data, new TypeToken<ResponseBodyRec<BasePointRec>>() {
}.getType());
ipcHandler.onPointStatusReceive(listDataRec);
}
//******************************* 对外开放的方法 **************************************/
/**
* 服务已解绑
*/
public void unBound() {
isBound = false;
}
/**
* 向服务端请求数据
*
* @param sub 请求参数类
*/
public void reqData(Object sub) {
try {
if (ipcInteract != null || checkConnectAndDoReConnect()) {
ipcInteract.reqData((String) sub);
}
} catch (RemoteException e) {
e.printStackTrace();
}
}
/**
* 断网重连
*/
public void doReConnect() {
if (ipcInteract == null) {
return;
}
try {
ipcInteract.doReConnect();
} catch (RemoteException e) {
e.printStackTrace();
}
}
/**
* 判断是否连接服务器 false就重新连接
*/
private boolean checkConnectAndDoReConnect() {
if (!connect) {
Log.e(TAG, "connect is broken");
doReConnect();
return false;
}
return true;
}
public void switchIPCHandler(IPCHandler ipcHandler) {
this.ipcHandler = ipcHandler;
}
//********************************** get方法 ****************************************/
public boolean isBound() {
return isBound;
}
public ServiceConnection getConnection() {
return connection;
}
//*********************************** 单例化 ****************************************/
public static IPCUtil getInstance() {
return IPCUtilInstance.instance;
}
private static class IPCUtilInstance {
private static IPCUtil instance = new IPCUtil();
}
}
7 创建服务
public class NettyService extends Service {
private static final String TAG = "NettyService";
//service端推送数据时的 client接收类
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Log.d(TAG, "onStartCommand / " + Thread.currentThread() + "/" + System.currentTimeMillis() + "/flags is " + flags);
if (intent != null) {
String token = (String) SharedInfo.getInstance().getValue(Constant.TOKEN, "");
Config.address.set(intent.getStringExtra(BundleKeys.WS_HOST));
Config.port.set(intent.getIntExtra(BundleKeys.WS_PORT, BaseParams.WS_PORT));
Config.scheme.set(intent.getStringExtra(BundleKeys.WS_SCHEME));
Config.path.set(intent.getStringExtra(BundleKeys.WS_PATH));
Config.token.set(token);
NettyClient.getInstance().connect(Config.address.get(), Config.port.get());
}
return START_STICKY;
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return NettyClient.getInstance().getMBinder();
}
@Override
public boolean onUnbind(Intent intent) {
Log.d(TAG, "unbindService");
NettyClient.getInstance().close();
return super.onUnbind(intent);
}
@Override
public void onDestroy() {
Log.d(TAG, "onDestroy");
NettyClient.getInstance().close();
super.onDestroy();
}
}
8 Activity中绑定服务 以及指定IpcHandler对象
在Activity销毁时需要注意注销服务以及断开Stocket连接 否则会导致oom
@Route(path = RouterUrl.MAIN, extras = RouterExtras.EXTRA_COMMON)
public class MainAct extends BaseActivity {
private ActMainBinding mainBinding;
private MainCtrl mainCtrl;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
mainBinding = DataBindingUtil.setContentView(this, R.layout.act_main);
mainCtrl = new MainCtrl(mainBinding);
mainBinding.setViewCtrl(mainCtrl);
startService();
}
@Override
protected void onStart() {
super.onStart();
bindService();
}
@Override
protected void onResume() {
super.onResume();
IPCUtil.getInstance().switchIPCHandler(mainCtrl.getIpcHandler());
}
@Override
protected void onDestroy() {
if (IPCUtil.getInstance().isBound()) {
unbindService(IPCUtil.getInstance().getConnection());
stopService(new Intent(this, NettyService.class));
IPCUtil.getInstance().unBound();
}
super.onDestroy();
}
/**
* 启动服务
*/
private void startService() {
Intent intent = new Intent(this, NettyService.class);
intent.putExtra(BundleKeys.WS_SCHEME, BaseParams.WS_SCHEME);
intent.putExtra(BundleKeys.WS_HOST, BaseParams.WS_HOST);
intent.putExtra(BundleKeys.WS_PORT,BaseParams.WS_PORT);
intent.putExtra(BundleKeys.WS_PATH, BaseParams.WS_PATH);
intent.putExtra(BundleKeys.WS_TOKEN, BaseParams.WS_TOKEN);
startService(intent);
}
/**
* 绑定服务
*/
private void bindService() {
Intent intent = new Intent(this, NettyService.class);
bindService(intent, IPCUtil.getInstance().getConnection(), BIND_AUTO_CREATE);
}
@Override
public void onBackPressed() {
ActivityManage.onExit();
}
}