基于Netty实现服务端与客户端通信

个人博客

http://www.milovetingting.cn

基于Netty实现服务端与客户端通信

前言

本文介绍基于Netty实现的服务端与客户端通信的简单使用方法,并在此基础上实现一个简单的服务端-客户端指令通信的Demo。

Netty是什么

Netty是一个NIO客户端-服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化了网络编程,例如TCP和UDP套接字服务器的开发。提供一个异步事件驱动的网络应用程序框架和工具,以快速开发可维护的高性能和高可扩展性协议服务器和客户端。

以上内容摘选自https://netty.io/wiki/user-guide-for-4.x.html

Netty具有以下特点:

  • 适用于各种传输类型的统一API-阻塞和非阻塞套接字
  • 更高的吞吐量,更低的延迟
  • 减少资源消耗
  • 减少不必要的内存复制
  • 完整的SSL / TLS和StartTLS支持

以上内容摘选自https://netty.io/

使用入门

Netty的使用,可以参照Netty的官方文档,这里以4.x为例来演示Netty在服务端和客户端上使用。文档地址:https://netty.io/wiki/user-guide-for-4.x.html

这里用Eclipse来进行开发,服务端和客户端都放在一个工程里。

新建Java工程

服务端

首先需要导入netty的jar包。这里使用netty-all-4.1.48.Final.jar。

NettyServer

新建NettyServer类

public class NettyServer {

    private int mPort;

    public NettyServer(int port) {
        this.mPort = port;
    }

    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    // 指定连接队列大小
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //KeepAlive
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //Handler
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(mPort).sync();
            if (f.isSuccess()) {
                LogUtil.log("Server,启动Netty服务端成功,端口号:" + mPort);
            }
            // f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // workerGroup.shutdownGracefully();
            // bossGroup.shutdownGracefully();
        }
    }

}

NettyServerHandler

在初始化时,需要指定Handle,用来处理Channel相关业务。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log("Server,接收到客户端发来的消息:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LogUtil.log("Server,exceptionCaught");
        cause.printStackTrace();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelInactive");
    }

}

经过上面这些步骤后,服务端最基本的设置就完成了。

客户端

客户端和服务端在初始化时大体是类似的,不过相比服务端要简单一些。

NettyClient

public class NettyClient {

    private String mHost;

    private int mPort;

    private NettyClientHandler mClientHandler;

    private ChannelFuture mChannelFuture;

    public NettyClient(String host, int port) {
        this.mHost = host;
        this.mPort = port;
    }

    public void connect() {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            mClientHandler = new NettyClientHandler();
            b.group(workerGroup).channel(NioSocketChannel.class)
                    // KeepAlive
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    // Handler
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(mClientHandler);
                        }
                    });
            mChannelFuture = b.connect(mHost, mPort).sync();
            if (mChannelFuture.isSuccess()) {
                LogUtil.log("Client,连接服务端成功");
            }
            mChannelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Client,channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log("Client,接收到服务端发来的消息:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LogUtil.log("Client,exceptionCaught");
        cause.printStackTrace();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Client,channelInactive");
    }

}

到这里,客户端最基本设置就完成了。

连接服务端

新建一个Main类,用于测试服务端和客户端是否能正常连接。

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            NettyClient client = new NettyClient(host, port);
            client.connect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

运行main方法,输出日志如下:

2020-4-13 0:11:02--Server,启动Netty服务端成功,端口号:12345
2020-4-13 0:11:03--Client,channelActive
2020-4-13 0:11:03--Client,连接服务端成功
2020-4-13 0:11:03--Server,channelActive

可以看到,客户端成功连接上了服务端,服务端和客户端里设置的Handler的channelActive方法都会回调。

服务端与客户端通信

在服务端与客户端连接成功后,我们往往需要在双方间进行通信。这里假定,在连接成功后,服务端给客户端发送一个欢迎信息"你好,客户端",而客户端在收到服务端的消息后,也给服务端回复一个消息"你好,服务端"。下面来实现具体的功能。

修改服务端NettyServerHandler中的channelActive方法和channelRead方法,在channelActive方法中给客户端发送消息,在channelRead方法中解析客户端发来的消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端", Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message = new String(buffer, "utf-8");
        LogUtil.log("Server,接收到客户端发来的消息:" + message);
    }

}

修改客户端NettyClientHandler中的channelRead方法,当收到服务端的消息时,回复服务端

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message = new String(buffer,"utf-8");
        LogUtil.log("Client,接收到服务端发来的消息:" + message);
        
        ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服务端", Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
    }

}

运行后,输出日志如下:

2020-4-13 0:29:16--Server,启动Netty服务端成功,端口号:12345
2020-4-13 0:29:17--Client,channelActive
2020-4-13 0:29:17--Client,连接服务端成功
2020-4-13 0:29:17--Server,channelActive
2020-4-13 0:29:17--Client,接收到服务端发来的消息:你好,客户端
2020-4-13 0:29:17--Server,接收到客户端发来的消息:你好,服务端

可以看到,服务端与客户端已经可以正常通信。

粘包与拆包

在实际的使用场景中,可能会存在短时间内大量数据发送的问题。我们模拟这个场景。在客户端连接上服务端后,服务端给客户端发送100个消息,而为便于分析,客户端在收到服务端消息后,不作回复。

修改服务端中NettyServerHandler的channelActive方法

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        for (int i = 0; i < 100; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端", Charset.forName("utf-8"));
            ctx.writeAndFlush(byteBuf);
        }
    }

修改客户端中NettyClientHandler的channelRead方法

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message = new String(buffer, "utf-8");
        LogUtil.log("Client,接收到服务端发来的消息:" + message);

        //ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服务端", Charset.forName("utf-8"));
        //ctx.writeAndFlush(byteBuf);
    }

运行后,输出的部分结果如下:

2020-4-13 0:35:28--Server,启动Netty服务端成功,端口号:12345
2020-4-13 0:35:29--Client,channelActive
2020-4-13 0:35:29--Client,连接服务端成功
2020-4-13 0:35:29--Server,channelActive
2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端
2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端
2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端

可以看到,出现了多条消息"粘"在一起的情况。

什么是粘包与拆包

TCP是个"流"协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

以上内容摘选自TCP粘包/拆包与Netty解决方案

解决方案

在没有 Netty 的情况下,用户如果自己需要拆包,基本原理就是不断从 TCP 缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。

以上内容摘选自彻底理解Netty,这一篇文章就够了

而使用Netty,则解决这个问题的方法就简单多了。Netty已经提供了四个拆包器:

  • FixedLengthFrameDecoder:固定长度的拆包器,Netty会把固定长度的数据包发送给下一个channelHandler
  • LineBasedFrameDecoder:行拆包器,每个数据包以换行符分隔发送
  • DelimiterBasedFrameDecoder:分隔符拆包器,可以自定义分隔符,行拆包器是分隔符拆包器的一种特例
  • LengthFieldBasedFrameDecoder:基于长度域的拆包器,如果自定义协议中包含长度域的字段,就可以使用这个拆包器

在这里,我们选用分隔符拆包器

首先定义分隔符

public class Config {
    public static final String DATA_PACK_SEPARATOR = "#$&*";
}

在服务端的channelHandler配置中,需要增加

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //这个配置需要在添加Handler前设置
    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
    channel.pipeline().addLast(new NettyServerHandler());
    }

在客户端的channelHandler的配置中,同样也需要增加

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //这个配置需要在添加Handler前设置
    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
    channel.pipeline().addLast(new NettyServerHandler());
    }

发送数据时,在数据的末尾增加分隔符:

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        for (int i = 0; i < 100; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端"+Config.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
            ctx.writeAndFlush(byteBuf);
        }
    }

运行后,可以发现,已经解决"粘包"与"拆包"的问题。

心跳

在网络应用中,为了判断连接是否还存在,一般会通过发送心跳包来检测。在Netty中,配置心跳包的步骤如下

在客户端的channelHandler的配置中,需要增加

@Override
protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
            //...
                        }

在NettyClientHandler中,重写userEventTriggered方法

@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        LogUtil.log("Client,Idle:" + event.state());
        switch (event.state()) {
        case READER_IDLE:

            break;
        case WRITER_IDLE:
            ByteBuf byteBuf = Unpooled.copiedBuffer("心跳^v^v", Charset.forName("utf-8"));
            break;
        case ALL_IDLE:
            break;
        default:
            super.userEventTriggered(ctx, evt);
            break;
        }
    }

当写空闲达到配置的时间时,往服务端发送一个心跳消息

运行后,日志输出如下:

2020-4-13 1:22:50--Server,启动Netty服务端成功,端口号:12345
2020-4-13 1:22:51--Client,channelActive
2020-4-13 1:22:51--Client,连接服务端成功
2020-4-13 1:22:51--Server,channelActive
2020-4-13 1:22:51--Client,接收到服务端发来的消息:你好,客户端
2020-4-13 1:22:56--Client,Idle:WRITER_IDLE
2020-4-13 1:22:56--Server,接收到客户端发来的消息:心跳^v^
2020-4-13 1:22:56--Client,Idle:READER_IDLE
2020-4-13 1:23:01--Client,Idle:WRITER_IDLE
2020-4-13 1:23:01--Server,接收到客户端发来的消息:心跳^v^
2020-4-13 1:23:01--Client,Idle:READER_IDLE

可以看到,心跳包按我们配置的时间正常输出了。

配置编码器与解码器

我们上面在发送数据时,需要通过ByteBuf来转换String,而通过配置编码,解码器,我们就可以直接发送字符串。配置如下:

在服务端与客户端的channelHandler分别增加以下配置:

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //...
    //这个配置需要在添加Handler前设置
    channel.pipeline().addLast("encoder", new StringEncoder());
    channel.pipeline().addLast("decoder", new StringDecoder());
    //...
}

在发送消息时,则可以直接通过ctx.writeAndFlush("心跳^v^" + Config.DATA_PACK_SEPARATOR)的形式来发送。

源码

到此,最简单的服务端与客户端通信的Demo已经完成。源码地址:https://github.com/milovetingting/Samples/tree/master/NettyDemo

使用进阶

在上面的基础上,我们来实现一个下面的需求:

  • 客户端需要登录到服务端

  • 客户端登录成功后,服务端可以给客户端发送指令消息,客户端在收到消息及处理完消息后,都需要上报给服务端

封装连接

为便于程序扩展,我们将客户端连接服务端的部分抽取出来。通过一个接口来定义连接的方法,而连接的具体实现由子类来实现。

定义接口

public interface IConnection {

    /**
     * 连接服务器
     * 
     * @param host     服务器地址
     * @param port     端口
     * @param callback 连接回调
     */
    public void connect(String host, int port, IConnectionCallback callback);

}

在这里还需要定义连接的回调接口

public interface IConnectionCallback {

    /**
     * 连接成功
     */
    public void onConnected();

}

具体的连接实现类

public class NettyConnection implements IConnection {

    private NettyClient mClient;

    @Override
    public void connect(String host, int port, IConnectionCallback callback) {
        if (mClient == null) {
            mClient = new NettyClient(host, port);
            mClient.setConnectionCallBack(callback);
            mClient.connect();
        }
    }

}

为便于管理连接,定义一个连接的管理类

public class ConnectionManager implements IConnection {

    private static IConnection mConnection;

    private ConnectionManager() {

    }

    static class ConnectionManagerInner {
        private static ConnectionManager INSTANCE = new ConnectionManager();
    }

    public static ConnectionManager getInstance() {
        return ConnectionManagerInner.INSTANCE;
    }

    public static void initConnection(IConnection connection) {
        mConnection = connection;
    }

    private void checkInit() {
        if (mConnection == null) {
            throw new IllegalAccessError("please invoke initConnection first!");
        }
    }

    @Override
    public void connect(String host, int port, IConnectionCallback callback) {
        checkInit();
        mConnection.connect(host, port, callback);
    }

}

调用连接:

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            ConnectionManager.initConnection(new NettyConnection());
            ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

                @Override
                public void onConnected() {
                    LogUtil.log("Main,onConnected"););
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

在调用connect方法前,需要先调用initConnection来指定具体的连接类

消息Bean的定义

在连接成功后,服务端会给客户端发送一个欢迎的消息。为便于管理,我们定义一个消息Bean

public class Msg {

    /**
     * 欢迎
     */
    public static final int TYPE_WELCOME = 0;

    public int type;

    public String msg;

}

服务端发送欢迎消息

服务端发送消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private ChannelHandlerContextWrapper mChannelHandlerContextWrapper;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        mChannelHandlerContextWrapper = new ChannelHandlerContextWrapper(ctx);
        MsgUtil.sendWelcomeMsg(mChannelHandlerContextWrapper);
    }
}

在这里,通过定义一个ChannelHandlerContextWrapper类来统一管理消息分隔符

public class ChannelHandlerContextWrapper {

    private ChannelHandlerContext mContext;

    public ChannelHandlerContextWrapper(ChannelHandlerContext context) {
        this.mContext = context;
    }

    /**
     * 包装writeAndFlush方法
     * 
     * @param object
     */
    public void writeAndFlush(Object object) {
        mContext.writeAndFlush(object + Config.DATA_PACK_SEPARATOR);
    }

}

再进一步,通过定义MsgUtil类来封装发送欢迎消息

public class MsgUtil {

    /**
     * 发送欢迎消息
     * 
     * @param wrapper
     */
    public static void sendWelcomeMsg(ChannelHandlerContextWrapper wrapper) {
        Msg msg = new Msg();
        msg.type = Msg.TYPE_WELCOME;
        msg.msg = "你好,客户端";
        wrapper.writeAndFlush(Global.sGson.toJson(msg));
    }

}

客户端消息接收

对于客户端而言,为方便处理消息,我们需要定义一个方法来接收消息。通过在IConnection接口中新增一个registerMsgCallback方法来实现

public interface IConnection {

    /**
     * 连接服务器
     * 
     * @param host     服务器地址
     * @param port     端口
     * @param callback 连接回调
     */
    public void connect(String host, int port, IConnectionCallback callback);

    /**
     * 注册消息回调
     * 
     * @param callback
     */
    public void registerMsgCallback(IMsgCallback callback);

}

在这里,还需要新增IMsgCallback接口

public interface IMsgCallback {

    /**
     * 接收到消息时的回调
     * 
     * @param msg
     */
    public void onMsgReceived(Msg msg);

}

对应到实现类

public class NettyConnection implements IConnection {

    private NettyClient mClient;

    @Override
    public void connect(String host, int port, IConnectionCallback callback) {
        if (mClient == null) {
            mClient = new NettyClient(host, port);
            mClient.setConnectionCallBack(callback);
            mClient.connect();
        }
    }

    @Override
    public void registerMsgCallback(IMsgCallback callback) {
        if (mClient == null) {
            throw new IllegalAccessError("please invoke connect first!");
        }
        mClient.registerMsgCallback(callback);
    }

}

消息的分发

在客户端,为便于处理消息,我们对消息类型进行划分

修改消息Bean

public class Msg {

    /**
     * 欢迎
     */
    public static final int TYPE_WELCOME = 0;

    /**
     * 心跳
     */
    public static final int TYPE_HEART_BEAT = 1;

    /**
     * 登录
     */
    public static final int TYPE_LOGIN = 2;

    public static final int TYPE_COMMAND_A = 3;

    public static final int TYPE_COMMAND_B = 4;

    public static final int TYPE_COMMAND_C = 5;

    public int type;

    public String msg;
}

假定消息是串行的,需要一个一个地处理。为便于管理消息,增加MsgQueue类

public class MsgQueue {

    private PriorityBlockingQueue<Msg> mQueue;

    private boolean using;

    private MsgQueue() {
        mQueue = new PriorityBlockingQueue<>(128, new Comparator<Msg>() {

            @Override
            public int compare(Msg msg1, Msg msg2) {
                int res = msg2.priority - msg1.priority;
                if (res == 0 && msg1.time != msg2.time) {
                    return (int) (msg2.time - msg1.time);
                }
                return res;
            }
        });
    }

    public static MsgQueue getInstance() {
        return MsgQueueInner.INSTANCE;
    }

    private static class MsgQueueInner {
        private static final MsgQueue INSTANCE = new MsgQueue();
    }

    /**
     * 将消息加入消息队列
     * 
     * @param msg
     */
    public void enqueueMsg(Msg msg) {
        mQueue.add(msg);
    }

    /**
     * 从消息队列获取消息
     * 
     * @return
     */
    public synchronized Msg next() {
        if (using) {
            return null;
        }
        Msg msg = mQueue.poll();
        if (msg != null) {
            makeUse(true);
        }
        return msg;
    }

    /**
     * 标记使用状态
     * 
     * @param use
     */
    public synchronized void makeUse(boolean use) {
        using = use;
    }

    /**
     * 是否能够使用
     * 
     * @return
     */
    public synchronized boolean canUse() {
        return !using;
    }

}

增加消息的分发类MsgDispatcher

public class MsgDispatcher {

    private static Map<Integer, Class<? extends IMsgHandler>> mHandlerMap;

    static {
        mHandlerMap = new HashMap<>();
        mHandlerMap.put(Msg.TYPE_WELCOME, WelcomeMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_HEART_BEAT, HeartBeatMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_LOGIN, HeartBeatMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_COMMAND_A, CommandAMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_COMMAND_B, CommandBMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_COMMAND_C, CommandCMsgHandler.class);
    }

    public static void dispatch() {
        if (MsgQueue.getInstance().canUse()) {
            Msg msg = MsgQueue.getInstance().next();
            if (msg == null) {
                return;
            }
            dispatch(msg);
        }
    }

    public static void dispatch(Msg msg) {
        try {
            IMsgHandler handler = (IMsgHandler) Class.forName(mHandlerMap.get(msg.type).getName()).newInstance();
            handler.handle(msg);
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

}

消息的处理

定义IMsgHandler,在这里定义了处理的方法,具体实现由子类实现

public interface IMsgHandler {

    /**
     * 处理消息
     * 
     * @param msg
     */
    public void handle(Msg msg);

}

为统一管理,定义Base类BaseCommandHandler

public abstract class BaseCommandHandler implements IMsgHandler {

    @Override
    public void handle(Msg msg) {
        execute(msg);
    }

    public final void execute(Msg msg) {
        LogUtil.log("Client,received command:" + msg);
        doHandle(msg);
        MsgQueue.getInstance().makeUse(false);
        LogUtil.log("Client,report command:" + msg);
        MsgDispatcher.dispatch();
    }

    public abstract void doHandle(Msg msg);

}

在BaseCommandHandler中,定义execute方法,顺序调用:上报消息已接收成功、处理消息、上报消息已处理完成。这里的消息上报部分,都只是输出一个日志来代替,在实际的业务中,可以抽取出一个抽象方法,让子类来实现。

定义子类,继承自BaseCommandHandler

public class LoginMsgHandler extends BaseCommandHandler {

    @Override
    public void doHandle(Msg msg) {
        LogUtil.log("Client,handle msg:" + msg);
    }

}

对应的心跳类型消息、欢迎类型消息等,都可以新增对应的处理类来实现,这里不再展开。

接收到消息时的处理

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            ConnectionManager.initConnection(new NettyConnection());
            ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

                @Override
                public void onConnected() {
                    LogUtil.log("Main,onConnected");

                    ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

                        @Override
                        public void onMsgReceived(Msg msg) {
                            MsgQueue.getInstance().enqueueMsg(msg);
                            MsgDispatcher.dispatch();
                        }
                    });
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

客户端登录

修改消息Bean,增加登录的请求和响应

public class Msg {

    /**
     * 欢迎
     */
    public static final int TYPE_WELCOME = 0;

    /**
     * 心跳
     */
    public static final int TYPE_HEART_BEAT = 1;

    /**
     * 登录
     */
    public static final int TYPE_LOGIN = 2;

    public static final int TYPE_COMMAND_A = 3;

    public static final int TYPE_COMMAND_B = 4;

    public static final int TYPE_COMMAND_C = 5;

    public int type;

    public String msg;

    public int priority;

    public long time;

    /**
     * 登录请求信息
     * 
     * @author Administrator
     *
     */
    public static class LoginRuquestInfo {
        /**
         * 用户名
         */
        public String user;

        /**
         * 密码
         */
        public String pwd;

        @Override
        public String toString() {
            return "LoginRuquestInfo [user=" + user + ", pwd=" + pwd + "]";
        }
    }

    /**
     * 登录响应信息
     * 
     * @author Administrator
     *
     */
    public static class LoginResponseInfo {

        /**
         * 登录成功
         */
        public static final int CODE_SUCCESS = 0;

        /**
         * 登录失败
         */
        public static final int CODE_FAILED = 100;

        /**
         * 响应码
         */
        public int code;

        /**
         * 响应数据
         */
        public String data;

        public static class ResponseData {
            public String token;
        }

        @Override
        public String toString() {
            return "LoginResponseInfo [code=" + code + ", data=" + data + "]";
        }

    }
}

发送登录请求

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            ConnectionManager.initConnection(new NettyConnection());
            ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

                @Override
                public void onConnected() {
                    LogUtil.log("Main,onConnected");

                    ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

                        @Override
                        public void onMsgReceived(Msg msg) {
                            MsgQueue.getInstance().enqueueMsg(msg);
                            MsgDispatcher.dispatch();
                        }
                    });

                    Msg msg = new Msg();
                    msg.type = Msg.TYPE_LOGIN;

                    Msg.LoginRuquestInfo request = new LoginRuquestInfo();
                    request.user = "wangyz";
                    request.pwd = "wangyz";

                    Gson gson = new Gson();
                    msg.msg = gson.toJson(request);

                    ConnectionManager.getInstance().sendMsg(msg);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

这里,引入Gson,将消息Bean转成json字符串后发送。

对应到服务端,为便于解析出消息,也需要对应的修改消息的Bean。服务端对消息的具体分发与处理,和客户端类似,这里不再展开。

源码

由于篇幅限制,Demo中指令的优先级处理,模拟服务端指令下发等,这里没有再进一步详细介绍,具体可以参考源码:https://github.com/milovetingting/Samples/tree/master/Netty

后记

本文介绍了基于Netty实现服务端与客户端通信的基本用法,以及在此基础上,实现处理服务端指令并上报。Demo中通信的数据格式,用到了json,而优化的做法,可以用protobuf来实现,这里只展示通信的流程及简单的封装,因而未使用protobuf。Demo中只实现大体的流程,可能存在未测试到的Bug,权当一个参考的思路吧。

End~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容