Java NIO

本来想写关于netty类的时序图,学习下设计模式并学习如何扩展Java nio的,毕竟对于我这种拧螺丝钉的给我一个任务如何写出高内聚低耦合的代码才是重要的,但是找不到合适相关联Java NIO和netty相关的代码,所以我花费了一点时间整理了下相关代码。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class NioSocketServer {

    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    private static final int BUFFERSIZE = 1024;

    private final ServerSocketChannel serverSocketChannel;

    //简易的Reactor模型,一个boss线程,2倍核数的工作线程
    private final Selector bossselector;
    private final Work[] works;
    private AtomicInteger index = new AtomicInteger();

    //用于缓存每个客户端粘包拆包等数据.
    private Map<SocketAddress,Read> cacheChannelBuffer = new ConcurrentHashMap<>();

    public NioSocketServer()throws IOException{
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(8888));
        bossselector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(bossselector, SelectionKey.OP_ACCEPT);
        new Thread(new Boss()).start();
        works = new Work[NCPU * 2];
        for(int i = 0;i < works.length;i++){
            new Thread(works[i] = new Work(Selector.open(),i)).start();
        }
    }

    public void accept(SelectionKey key) {
        System.out.println("accept事件");
        try {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            int i = index.getAndIncrement() & works.length - 1;
            works[i].register(socketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //包协议:包=包头(4byte)+包体,包头内容为包体的数据长度
    public void read(SelectionKey selectionKey) {
        System.out.println("read事件");
        try {
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            SocketAddress address = channel.getRemoteAddress();
            Read read = cacheChannelBuffer.get(address);
            int bodyLen = -1;
            ByteBuffer byteBuffer;
            if(read == null){
                byteBuffer = ByteBuffer.allocate(BUFFERSIZE);
            }else{
                if(read.headerLength == -1){
                    byteBuffer = ByteBuffer.allocate(BUFFERSIZE);
                }else{
                    bodyLen = read.getHeaderLength();
                    byteBuffer = ByteBuffer.allocate(read.getHeaderLength());
                }
                ByteBuffer readByteBuffer = read.getByteBuffer();
                if(readByteBuffer != null && readByteBuffer.hasRemaining()){
                    readByteBuffer.flip();
                    byteBuffer.put(readByteBuffer);
                }
                read.setByteBuffer(null);
                read.setHeaderLength(-1);
            }
            channel.read(byteBuffer);
            byteBuffer.flip();
            while (byteBuffer.remaining() > 0) {
                if (bodyLen == -1) {// 还没有读出包头,先读出包头
                    if (byteBuffer.remaining() >= 4) {// 读出包头,否则缓存
                        byteBuffer.mark();
                        bodyLen = byteBuffer.getInt();
                    } else {
                        remaining(read, byteBuffer, address, bodyLen);
                        break;
                    }
                } else {// 已经读出包头
                    if (byteBuffer.remaining() >= bodyLen) {// 大于等于一个包,否则缓存
                        byte[] bodyByte = new byte[bodyLen];
                        byteBuffer.get(bodyByte, 0, bodyLen);
                        bodyLen = -1;
                        System.out.println("receive from clien content is:" + new String(bodyByte));
                    } else {
                        remaining(read, byteBuffer, address, bodyLen);
                        break;
                    }
                }
            }
//            String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "<html><body>Hello World!</body></html>";
//            ByteBuffer buffer = ByteBuffer.wrap(httpResponse.getBytes());
//            int len = channel.write(buffer);
//            if (len < 0){
//                throw new IllegalArgumentException();
//            }
//            if (len == 0) {
//                selectionKey.interestOps(SelectionKey.OP_WRITE);
//            }

            selectionKey.interestOps(SelectionKey.OP_READ);
        } catch (Exception e) {
            try {
                selectionKey.cancel();
                serverSocketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        }
    }
    private void remaining(Read read,ByteBuffer byteBuffer,SocketAddress address,int bodyLen){
        if(!byteBuffer.hasRemaining()){
            return;
        }
        if(read == null){
            read = new Read();
            cacheChannelBuffer.put(address,read);
        }
        read.setHeaderLength(bodyLen);
        int remaining = byteBuffer.remaining();
        byte[] remainingByte = new byte[remaining];
        byteBuffer.get(remainingByte, 0, remaining);
        read.setByteBuffer(ByteBuffer.allocate(remaining).put(remainingByte));
    }
    public void write(SelectionKey selectionKey) {
        System.out.println("write事件");
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "<html><body>Hello World!</body></html>";
        System.out.println("response from server to client");
        try {
            ByteBuffer byteBuffer = ByteBuffer.wrap(httpResponse.getBytes());
            while (byteBuffer.hasRemaining()) {
                socketChannel.write(byteBuffer);
            }
            selectionKey.interestOps(SelectionKey.OP_READ);
        } catch (IOException e) {
            try {
                selectionKey.cancel();
                serverSocketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        }
    }
    class Boss implements Runnable{

        private volatile boolean isExit;

        public boolean isExit() {
            return isExit;
        }

        public void setExit(boolean exit) {
            isExit = exit;
        }
        @Override
        public void run(){
            try {
                while (!isExit) {
                    int selectKey = bossselector.select();
                    if (selectKey > 0) {
                        Set<SelectionKey> keySet = bossselector.selectedKeys();
                        Iterator<SelectionKey> iter = keySet.iterator();
                        while (iter.hasNext()) {
                            SelectionKey selectionKey = iter.next();
                            iter.remove();
                            if (selectionKey.isAcceptable()) {
                                accept(selectionKey);
                            } else {
                                System.out.println("boss线程不可能出现work线程的事件,请检查代码。");
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {

            }
        }
    }

    class Work implements Runnable{

        private final Selector workSelector;
        private final int i;
        private volatile boolean isExit ;

        public Work(Selector workSelector,int i){
            this.workSelector = workSelector;
            this.i = i;
        }
        public SelectionKey register(SocketChannel socketChannel) throws ClosedChannelException {
            return socketChannel.register(workSelector, SelectionKey.OP_READ);
        }

        public boolean isExit() {
            return isExit;
        }

        public void setExit(boolean exit) {
            isExit = exit;
        }

        @Override
        public void run(){
            try {
                while (!isExit) {
                    int selectKey = workSelector.select(10);
                    if (selectKey > 0) {
                        Set<SelectionKey> keySet = workSelector.selectedKeys();
                        Iterator<SelectionKey> iter = keySet.iterator();
                        while (iter.hasNext()) {
                            SelectionKey selectionKey = iter.next();
                            iter.remove();
                            if (selectionKey.isAcceptable()) {
                                System.out.println("work线程不可能出现boss线程的事件,请检查代码。");
                            }else if (selectionKey.isReadable()) {
                                read(selectionKey);
                            }else if (selectionKey.isWritable()) {
                                write(selectionKey);
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {

            }
        }
    }

    class Read{
        private ByteBuffer byteBuffer;
        private int headerLength = -1;

        public ByteBuffer getByteBuffer() {
            return byteBuffer;
        }

        public void setByteBuffer(ByteBuffer byteBuffer) {
            this.byteBuffer = byteBuffer;
        }

        public int getHeaderLength() {
            return headerLength;
        }

        public void setHeaderLength(int headerLength) {
            this.headerLength = headerLength;
        }
    }

    public static void main(String args[]) throws IOException {
        NioSocketServer server = new NioSocketServer();
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NioSocketClient  {
    private SocketChannel socketChannel;
    private Selector selector = null;

    public NioSocketClient() throws IOException{
        InetSocketAddress inetSocketAddress = new InetSocketAddress(8888);
        selector = Selector.open();
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(inetSocketAddress);
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        new Thread(new Work()).start();
    }

    public void finishConnect(SelectionKey key) {
        System.out.println("client finish connect!");
        SocketChannel socketChannel = (SocketChannel) key.channel();
        try {
            socketChannel.finishConnect();
            key.interestOps(SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int len = channel.read(byteBuffer);
        if (len > 0) {
            byteBuffer.flip();
            byte[] byteArray = new byte[byteBuffer.limit()];
            byteBuffer.get(byteArray);
            System.out.println("client receive from server,content:"+new String(byteArray));
            len = channel.read(byteBuffer);
            byteBuffer.clear();
        }
        key.interestOps(SelectionKey.OP_READ);
    }

    public void send(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        for (int i = 0; i < 10; i++) {
            String ss = i + "Server ,how are you ? this is package message from NioSocketClient!";
            int headSize  = (ss).getBytes().length;
            ByteBuffer byteBuffer = ByteBuffer.allocate(4 + headSize);
            byteBuffer.putInt(headSize);
            byteBuffer.put(ss.getBytes());
            byteBuffer.flip();
            System.out.println("client send:" + i + ",context:"  + ss);
            while (byteBuffer.hasRemaining()) {
                try {
                    channel.write(byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        key.interestOps(SelectionKey.OP_READ);
    }

    class Work implements Runnable{
        @Override
        public void run(){
            while (true) {
                try {
                    int key = selector.select();
                    if (key > 0) {
                        Set<SelectionKey> keySet = selector.selectedKeys();
                        Iterator<SelectionKey> iter = keySet.iterator();
                        while (iter.hasNext()) {
                            SelectionKey selectionKey = null;
                            synchronized (iter) {
                                selectionKey = iter.next();
                                iter.remove();
                            }

                            if (selectionKey.isConnectable()) {
                                finishConnect(selectionKey);
                            }
                            if (selectionKey.isWritable()) {
                                send(selectionKey);
                            }
                            if (selectionKey.isReadable()) {
                                read(selectionKey);
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String args[]) throws IOException {
        NioSocketClient client = new NioSocketClient();
    }
}

以上服务端实现了简易的Reactor模型,并自定义了通信协议(协议格式:长度+内容),并实现了粘包和拆包的逻辑。netty不管怎么封装,最终都是要封装NIO那几行代码。
以下是我遇到的问题和我觉得需要注意细节。
1.我是在win10 jdk1.8实现的代码,服务端worker线程获取数据采用select()时,当有新的客户端连接时,获取不到数据,采用select(long timeout)可以获取到数据,这个问题没找到原因。
2.Java底层无法得知channel获取了多少数据,所以需要自定义ByteBuffer的大小,在发生拆包粘包时需要注意。netty实现了自动实现计算ByteBuffer的大小,不一定准确。
3.自定义的nio代码中,很少看到OP_WRITE的处理,经常看到的代码就是在请求处理完成后,直接通过下面的代码将结果返回给客户端。什么时候采用OP_WRITE,引用别人的一段话:
如果客户端的网络或者是中间交换机的问题,使得网络传输的效率很低,这时候会出现服务器已经准备好的返回结果无法通过TCP/IP层传输到客户端。这时候在执行上面这段程序的时候就会出现以下情况。
(1) bb.hasRemaining()一直为“true”,因为服务器的返回结果已经准备好了。
(2) socketChannel.write(bb)的结果一直为0,因为由于网络原因数据一直传不过去。
(3) 因为是异步非阻塞的方式,socketChannel.write(bb)不会被阻塞,立刻被返回。
(4) 在一段时间内,这段代码会被无休止地快速执行着,消耗着大量的CPU的资源。事实上什么具体的任务也没有做,一直到网络允许当前的数据传送出去为止。
因此,要对OP_WRITE加以处理,常用用法为:

            String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "<html><body>Hello World!</body></html>";
            ByteBuffer buffer = ByteBuffer.wrap(httpResponse.getBytes());
            int len = channel.write(buffer);
            if (len < 0){
                throw new IllegalArgumentException();
            }
            if (len == 0) {
                selectionKey.interestOps(SelectionKey.OP_WRITE);
            }

以上这段话在我实现的服务端中屏蔽了,后续会讲解netty时序图,学习netty优秀的源码。
最后,目前在找工作,现在在家cha

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容