NIO学习_Service

参考儒猿小文件系统的案例,按照自己的理解写了一下

对接NIO_CLIENT,客户端考虑是高性能,最大效率的传输文件,而服务端考虑的是高并发,服务端和客户端在数量上的不对等,造成了服务端要尽可能的处理多个连接,尽快的响应客户端请求。

客户端设计成Netty类似的架构,Rector模型,一个Selector负责接收Accept请求,然后分发给各个Processor进行请求解析,解析好的请求存入请求队列中,多个IO线程进行竞争拉取

设计成该架构是考虑到IO解析和磁盘写入性能不对等,在普通服务器上,磁盘IO的随机读写能力是一个瓶颈,性能较差,那么我们可以考虑在请求解析和磁盘的随机读写分离开,中间使用请求队列进行通知,那么我们可以动态的加大磁盘IO线程数量来提高磁盘的IO性能,请求解析是基于内存来进行的,性能较高,Processor线程数量可以较小。为什么将Accpet请求之后就分发给Processor线程呢,考虑到NIO的多路复用机制,它会轮询各个通道是否有相关事件发生,如果注册的连接过多,那么单个连接的请求处理速度肯定受到影响,如果分发给不同的线程进行请求解析,那么单个线程的连接数可以得到控制,在尽可能多的维持连接的情况下提高性能。

整个架构如下:


NIO_SERVER (2).png

现在存在的问题:

  1. 大文件,文件大于5M,并发上传,内存吃紧,容易内存溢出,现在采取的措施是异常捕捉,等待内存释放再次读取,但是这个没办法限制住NIO网络请求内存,这个是个问题
  2. 现在没办法保证一个READ事件能读取完整请求,是将未解析完的请求缓存起来,等待下次READ事件进行解析,但是考虑一个问题,TCP的丢包和重试会造成什么影响呢,没模拟出来,如果真出现这个问题,那么从丢包的那一刻起后续的请求解析都会出问题,这个问题就很严重
  3. 整个架构还是很脆弱,没有设计容错机制,且对资源过长时间占用没进行处理。

NioServer

package org.zymf.nio.example3.server;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author zhuyuemufeng
 * @version 1.0
 * @description: IO多路复用分发器
 * @date 2021-07-18 0:39
 */
public class NioServer extends Thread{
    /**
     * 选择器
     */
    private Selector selector;

    /**
     * 监听通道
     */
    private ServerSocketChannel socketChannel;

    /**
     * processor处理线程数量
     */
    private int NIO_PROCESSOR_NUM = 3;

    /**
     * 负责解析请求的Process线程集合
     */
    private List<NioProcessor> nioProcessors;

    /**
     * 磁盘读写线程数量
     */
    private int DISK_IO_THREAD_NUM  = 5;

    /**
     * 负责磁盘读写的线程集合
     */
    private List<NioIoThread> nioIoThreads;

    /**
     * 记录当前分配请求的Processors索引
     */
    private AtomicInteger nextProcessor;

    public NioServer() throws Exception {
        //初始化监听请求监听通道
        this.selector = Selector.open();
        this.socketChannel = ServerSocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.socket()
                .bind(new InetSocketAddress(7899),100);
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //初始化请求解析processor
        nioProcessors = new ArrayList<>(NIO_PROCESSOR_NUM);
        for (int i = 0; i < NIO_PROCESSOR_NUM; i++) {
            NioProcessor nioProcessor = new NioProcessor(i);
            nioProcessors.add(nioProcessor);
            NetWorkRespondQueue.get().initRespondQueue(i);
            nioProcessor.start();
        }

        //初始化磁盘读写线程
        nioIoThreads = new ArrayList<>(DISK_IO_THREAD_NUM);
        for (int i = 0; i < DISK_IO_THREAD_NUM; i++) {
            NioIoThread nioIoThread = new NioIoThread();
            nioIoThreads.add(nioIoThread);
            nioIoThread.start();
        }

        //初始化轮询计数器
        nextProcessor = new AtomicInteger(0);
    }

    @Override
    public void run() {
        while (true){
            try {
                //同步非阻塞获取响应事件
                int select = selector.select();
                if (select > 0){
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isAcceptable()){
                            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                            SocketChannel channel = serverSocketChannel.accept();
                            if (channel != null){
                                System.out.println(">>>>>>>>>>>>接收Accept请求");
                                channel.configureBlocking(false);
                                int index = incrementAndGetProcessor();
                                NioProcessor nioProcessor = nioProcessors.get(index);
                                nioProcessor.addChannel(channel);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * @description: 轮询Processor
     * @param: * @param:
     * @return: int
     * @author zhuyuemufeng
     * @date: 2021-07-18 1:08
     */
    private int incrementAndGetProcessor() {
        for (;;) {
            int current = nextProcessor.get();
            int next = (current + 1) % NIO_PROCESSOR_NUM;
            if (nextProcessor.compareAndSet(current, next))
                return next;
        }
    }
}

NioProcessor

package org.zymf.nio.example3.server;

import org.zymf.nio.example3.constant.Constant;

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author zhuyuemufeng
 * @version 1.0
 * @description: 请求监听解析器
 * @date 2021-07-18 0:40
 */
public class NioProcessor extends Thread {

    /**
     * 当前Process的ID
     */
    private int processId;

    /**
     * 多路复用选择器
     */
    private Selector selector;

    /**
     * 等待注册监听的Channel
     */
    private ConcurrentLinkedQueue<SocketChannel> channelQueue;

    /**
     * 未读完的请求
     */
    private ConcurrentHashMap<String, NetWorkRequest> cachedRequests;

    /**
     * 暂存的请求响应
     */
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<NetWorkRespond>> cachedResponds;

    /**
     * address对应SelectKeys映射表
     */
    private ConcurrentHashMap<String, SelectionKey> cacheKeys;

    public NioProcessor(int processId) throws Exception {
        System.out.println(">>>>>>>>>>>>NioProcessor 解析注册Processor已初始化");
        this.processId = processId;
        this.channelQueue = new ConcurrentLinkedQueue<>();
        this.cachedRequests = new ConcurrentHashMap<>();
        this.cachedResponds = new ConcurrentHashMap<>();
        this.cacheKeys = new ConcurrentHashMap<>();
        this.selector = Selector.open();
    }

    /**
     * @description: 加入需要注册监听的通道
     * @param: * @param: channel
     * @return: void
     * @author zhuyuemufeng
     * @date: 2021-07-18 1:32
     */
    public void addChannel(SocketChannel channel) {
        channelQueue.offer(channel);
        System.out.println(">>>>>>>>>>>>Processsor 加入Channel");
        selector.wakeup();
    }

    @Override
    public void run() {
        //这边我们就需要将加入的监听Channnel加入到Selector中
        System.out.println(">>>>>>>>>>>>NioProcessor 已启动");
        while (true) {
            try {
                //注册等待处理的连接
                registerQueuedClients();
                //处理返回响应
                cacheQueuedResponse();
                //处理连接的请求解析
                process();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * @description: 事件监听及相关请求解析
     * @param: * @param:
     * @return: void
     * @author zhuyuemufeng
     * @date: 2021-07-18 12:27
     */
    private void process() throws Exception {
        int select = selector.select(Constant.POLL_BLOCK_MAX_TIME);
        if (select > 0) {
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //防止事件重复处理
                iterator.remove();
                if (key.isReadable()) {
                    System.out.println("****************触发Read事件");
                    //这里的channel都是读写事件,Accept在NioServer中已经处理
                    SocketChannel channel = (SocketChannel) key.channel();
                    //获取当前请求的远端地址标识,作为请求缓存的Key
                    String client = channel.getRemoteAddress().toString();
                    NetWorkRequest request = null;
                    if (cachedRequests.contains(client)) {
                        //说明还要一个请求没有完整读完
                        request = cachedRequests.get(client);
                    } else {
                        request = new NetWorkRequest();
                    }
                    request.setChannel(channel);
                    request.setClient(client);
                    request.setProcessorId(processId);

                    request.read();

                    if (request.complete()) {
                        //如果读取完整,就要放入请求队列中,让IOThread解析
                        NetworkRequestQueue.get().offer(request);
                        //从缓存中移除该请求
                        cachedRequests.remove(client);
                        //未读完的数据等待下次再次读
                        cacheKeys.put(client, key);
                    } else {
                        //请求未解析完整,继续关注Read请求
                        cachedRequests.put(client, request);
                    }
                }

                if (key.isWritable()) {
                    System.out.println("****************触发Write事件");
                    SocketChannel channel = (SocketChannel) key.channel();
                    String client = channel.getRemoteAddress().toString();
                    ConcurrentLinkedQueue<NetWorkRespond> responds = cachedResponds.get(client);
                    NetWorkRespond respond = null;
                    while (responds != null && (respond = responds.poll()) != null) {
                        ByteBuffer content = respond.getContent();
                        while (content.hasRemaining()) {
                            channel.write(content);
                        }
                    }
                    //最好手动触发下取消Write,防止死循环打死CPU
                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                }
            }
        }
    }

    /**
     * @description: 注册等待监听的连接
     * @param: * @param:
     * @return: void
     * @author zhuyuemufeng
     * @date: 2021-07-18 12:20
     */
    private void registerQueuedClients() throws Exception {
        SocketChannel socketChannel = null;
        while ((socketChannel = channelQueue.poll()) != null) {
            //注册读取事件
            System.out.println(">>>>>>>>>>>>Processsor 注册注册读写事件");
            socketChannel.register(selector, SelectionKey.OP_READ);
        }
    }

    /**
     * @description: 暂存返回的响应,这里是一个请求对应一个响应
     * @param: * @param:
     * @return: void
     * @author zhuyuemufeng
     * @date: 2021-07-18 16:14
     */
    private void cacheQueuedResponse() throws Exception {
        NetWorkRespond respond = null;
        while ((respond = NetWorkRespondQueue.get().poll(processId)) != null) {
            //放入缓存
            ConcurrentLinkedQueue<NetWorkRespond> respondQueue = cachedResponds.get(respond.getClient());
            if (respondQueue == null) {
                respondQueue = new ConcurrentLinkedQueue<>();
                cachedResponds.put(respond.getClient(), respondQueue);
            }
            respondQueue.offer(respond);
            cacheKeys.get(respond.getClient()).interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }
    }
}

NioIoThread

package org.zymf.nio.example3.server;

import org.zymf.nio.example3.constant.Constant;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * @author zhuyuemufeng
 * @version 1.0
 * @description: TODO
 * @date 2021-07-18 0:42
 */
public class NioIoThread extends Thread {

    private NetworkRequestQueue requestQueue;

    public NioIoThread() {
        System.out.println(">>>>>>>>>>>>NioIoThread 本地磁盘线程已初始化");
        this.requestQueue = NetworkRequestQueue.get();
    }

    @Override
    public void run() {
        System.out.println(">>>>>>>>>>>>NioIoThread 已启动");
        while (true) {
            try {
                NetWorkRequest request = requestQueue.poll();
                if (request == null) {
                    Thread.sleep(200);
                    continue;
                }
                int type = request.getType();

                if (type == Constant.REQUEST_SEND_FILE) {
                    writeFileToLocalDisk(request);
                }
                if (type == Constant.REQUEST_READ_FILE) {
                    readFileFromLocalDisk(request);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * @description: 将本地缓冲写入本地磁盘
     * @param: * @param: request
     * @return: void
     * @author zhuyuemufeng
     * @date: 2021-07-18 15:08
     */
    private void writeFileToLocalDisk(NetWorkRequest request) {

        FileOutputStream outputStream = null;
        FileChannel fileChannel = null;
        NetWorkRespond respond = new NetWorkRespond();
        respond.setClient(request.getClient());
        try {
             outputStream = new FileOutputStream(Constant.BASE_DISK_DIR + request.getFileName());
             fileChannel = outputStream.getChannel();
            ByteBuffer buffer = request.getCachedFileBuffer();
            buffer.flip();
            //文件写入
            int writeCount = 1;
            while (writeCount > 0){
                writeCount = fileChannel.write(buffer);
            }
            
            //封装响应
            byte[] bytes = "success".getBytes();
            //32位请求ID + 4位响应状态 + 4位响应长度 + 响应长度
            ByteBuffer content = ByteBuffer.allocate(32 + 4 + 4 + bytes.length);
            content.put(request.getRequestId().getBytes());
            content.putInt(Constant.READ_STATUS_SUCCESS);
            content.putInt(bytes.length);
            content.put(bytes);
            content.flip();
            respond.setContent(content);
        }catch (Exception e){
            e.printStackTrace();
            byte[] bytes = "fail".getBytes();
            ByteBuffer content = ByteBuffer.allocate(4 + bytes.length);
            content.putInt(Constant.READ_STATUS_FAIL);
            content.put(bytes);
            content.flip();
            respond.setContent(content);
        }finally {
            try {
                outputStream.close();
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        NetWorkRespondQueue.get().offer(request.getProcessorId(),respond);
    }

    /** 
     * @description: 读取本地文件请求
     * @param: * @param: request 
     * @return: void 
     * @author zhuyuemufeng
     * @date: 2021-07-18 15:38
     */
    private void readFileFromLocalDisk(NetWorkRequest request) {
        String fileName = request.getFileName();
        String realFilePath = Constant.BASE_DISK_DIR + fileName;
        FileInputStream inputStream = null;
        FileChannel fileChannel = null;
        NetWorkRespond respond = new NetWorkRespond();
        respond.setClient(request.getClient());
        try {
            inputStream = new FileInputStream(realFilePath);
            fileChannel = inputStream.getChannel();
            int length = (int) new File(realFilePath).length();
            ByteBuffer buffer = ByteBuffer.allocate(32 + 8 + length);
            buffer.put(request.getRequestId().getBytes());
            buffer.putInt(Constant.READ_STATUS_SUCCESS);
            buffer.putInt(length);
            int readCount = 1;
            while (readCount > 0){
                readCount = fileChannel.read(buffer);
            }

            buffer.flip();
            respond.setContent(buffer);
        }catch (Exception e){
            ByteBuffer buffer = ByteBuffer.allocate(4);
            buffer.putInt(Constant.READ_STATUS_FAIL);
            buffer.flip();
            respond.setContent(buffer);
        }
        NetWorkRespondQueue.get().offer(request.getProcessorId(),respond);
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容