BIO、NIO、AIO实现聊天室功能

源码地址:https://github.com/shawntime/shawn-architect-notes/tree/master/test-netty/src/main/java/com/shawntime/architect/notes/netty

BIO(同步阻塞I/O模式)

数据的读取比较阻塞在一个线程中进行,内核调用read()、write()、accept()函数均阻塞

image-20210413145738609
缺点
  • IO代码里read操作是阻塞操作,如果连接不做数据读写操作会导致线程阻塞,浪费资源
  • 线程很多,会导致服务器线程太多,压力太大
BIO模式实现一个聊天室程序
// 聊天室程序服务端
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

public class Server {

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(10);

    private List<Socket> sockets;

    private int port;

    private ServerSocket serverSocket;

    private boolean isClosed = false;

    public Server(int port) {
        this.port = port;
        sockets = new ArrayList<>();
    }

    public void start() {
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("服务启动成功,监听端口:" + port);
            while (!isClosed) {
                Socket socket = serverSocket.accept();
                System.out.println(socket.getRemoteSocketAddress() + "连接上线...");
                sockets.add(socket);
                EXECUTOR_SERVICE.submit(new SocketThread(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            for (Socket socket : sockets) {
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }

    private class SocketThread implements Runnable {

        private Socket socket;

        public SocketThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            String msg = "欢迎【"
                    + socket.getRemoteSocketAddress() + "】进入聊天室!当前聊天室有【"
                    + sockets.size() + "】人";
            sendMsg(msg);
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while ((line = reader.readLine()) != null) {
                    msg = "【" + socket.getRemoteSocketAddress() + "】:" + line;
                    sendMsg(msg);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        private void sendMsg(String msg) {
            for (Socket socket : sockets) {
                PrintWriter pw = null;
                try {
                    pw = new PrintWriter(socket.getOutputStream(), true);
                    pw.println(msg);
                    pw.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public boolean isClosed() {
        return isClosed;
    }

    public void setClosed(boolean closed) {
        isClosed = closed;
    }
}
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;

// 聊天室程序客户端
public class Client {

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);

    private String serverIp;

    private int serverPort;

    private Socket socket;

    private boolean isClosed;

    public Client(String serverIp, int serverPort) {
        this.serverIp = serverIp;
        this.serverPort = serverPort;
    }

    public void start() {
        try {
            socket = new Socket(serverIp, serverPort);
            EXECUTOR_SERVICE.submit(new ClientThread(socket));
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private class ClientThread implements Runnable {

        private Socket socket;

        public ClientThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            Scanner scanner = new Scanner(System.in);
            System.out.println("请留言:");
            while (!isClosed) {
                String line = scanner.nextLine();
                PrintWriter pw = null;
                try {
                    pw = new PrintWriter(socket.getOutputStream(), true);
                    pw.println(line);
                    pw.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public boolean isClosed() {
        return isClosed;
    }

    public void setClosed(boolean closed) {
        isClosed = closed;
    }
}
NIO(同步非阻塞IO模式)

服务器实现模式为一个线程可以处理多个请求(连接),客户端发送的连接请求都会注册到多路复用器selector上,多路复用器轮询到连接有IO请求就进行处理。

image-20210413145427704
NIO实现一个聊天室
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class NIOServer {

    private int port;

    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    private volatile boolean isClosed;

    private List<SocketChannel> socketChannels = new ArrayList<>();

    public NIOServer(int port) {
        this.port = port;
    }

    public void start() {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            // 绑定端口
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            // 设置非阻塞
            serverSocketChannel.configureBlocking(false);
            selector = Selector.open();
            // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (!isClosed) {
                System.out.println("等待客户端链接...");
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey selectionKey = keyIterator.next();
                    handler(selectionKey);
                    keyIterator.remove();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void handler(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isAcceptable()) {
            acceptHandler(selectionKey);
        }
        if (selectionKey.isReadable()) {
            readHandler(selectionKey);
        }
    }

    private void acceptHandler(SelectionKey selectionKey) throws IOException {
        System.out.println("有新客户端链接...");
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
        socketChannels.add(socketChannel);
        String msg = "欢迎【"
                + socketChannel.getRemoteAddress() + "】进入聊天室!当前聊天室有【"
                + socketChannels.size() + "】人";
        print(msg);
    }

    private void readHandler(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int len = socketChannel.read(byteBuffer);
        if (len != -1) {
            String line = "【" + socketChannel.getRemoteAddress() + "】:" + new String(byteBuffer.array(), 0, len);
            System.out.println(line);
            print(line);
        }
        selectionKey.interestOps(SelectionKey.OP_READ);
    }

    private void print(String line) throws IOException {
        for (SocketChannel channel : socketChannels) {
            ByteBuffer buffer = ByteBuffer.wrap(line.getBytes());
            channel.write(buffer);
        }
    }

    public static void main(String[] args) {
        NIOServer server = new NIOServer(9023);
        server.start();
    }

}
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ExecutorService;

public class NIOClient {

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);

    private String ip;

    private int port;

    private volatile boolean isClosed;

    private SocketChannel socketChannel;

    public NIOClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void start() {
        try {
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress(ip, port));
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            EXECUTOR_SERVICE.submit(new ChatThread());

            while (!isClosed) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    handler(selectionKey);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void handler(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isConnectable()) {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            if (socketChannel.isConnectionPending()) {
                socketChannel.finishConnect();
            }
            socketChannel.configureBlocking(false);
            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
        }
        if (selectionKey.isReadable()) {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int len = socketChannel.read(byteBuffer);
            if (len != -1) {
                String line = new String(byteBuffer.array(), 0, len);
                System.out.println(line);
            }
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }

    private class ChatThread implements Runnable {

        @Override
        public void run() {
            Scanner scanner = new Scanner(System.in);
            System.out.println("请留言:");
            while (!isClosed) {
                String line = scanner.nextLine();
                ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
                try {
                    socketChannel.write(byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public static void main(String[] args) {
        NIOClient client = new NIOClient("127.0.0.1", 9023);
        client.start();
    }
}

AIO(异步非阻塞IO)

异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AIOServer {

    private Charset charset = Charset.forName("utf-8");

    private int port;

    private AsynchronousChannelGroup channelGroup;

    private AsynchronousServerSocketChannel serverSocketChannel;

    private List<AsynchronousSocketChannel> socketChannels;

    private boolean isClosed;

    public AIOServer(int port) {
        this.port = port;
        socketChannels = new ArrayList<>();
    }

    private void start() throws IOException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
        serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
        serverSocketChannel.bind(new InetSocketAddress(port));
        System.out.println("启动服务器,监听端口:"+port);
        serverSocketChannel.accept(null, new AcceptHandler());
        //阻塞式调用,防止占用系统资源
        System.in.read();
    }

    private void print(String line) throws IOException {
        for (AsynchronousSocketChannel channel : socketChannels) {
            ByteBuffer buffer = send(line);
            channel.write(buffer);
        }
    }

    private class ClientHandler implements CompletionHandler<Integer, ByteBuffer> {

        private AsynchronousSocketChannel socketChannel;

        public ClientHandler(AsynchronousSocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            buffer.flip();
            try {
                String receive = receive(buffer);
                SocketAddress remoteAddress = socketChannel.getRemoteAddress();
                if ("quit".equalsIgnoreCase(receive)) {
                    System.out.println(remoteAddress + "已下线...");
                    socketChannels.remove(socketChannel);
                    String msg = "【"
                            + remoteAddress + "】退出聊天室!当前聊天室有【"
                            + socketChannels.size() + "】人";
                    print(msg);
                    socketChannel.close();
                    return;
                }
                String line = "【" + remoteAddress + "】:" + receive;
                System.out.println(line);
                print(line);
                buffer.clear();
                socketChannel.read(buffer, buffer,this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {

        }
    }

    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {

        @Override
        public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
            if (serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null,this);
            }
            if (socketChannel.isOpen()) {
                socketChannels.add(socketChannel);
                try {
                    String msg = "欢迎【"
                            + socketChannel.getRemoteAddress() + "】进入聊天室!当前聊天室有【"
                            + socketChannels.size() + "】人";
                    print(msg);
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    ClientHandler clientHandler = new ClientHandler(socketChannel);
                    socketChannel.read(buffer, buffer, clientHandler);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }


        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("error");
        }
    }

    private ByteBuffer send(String msg) {
        return charset.encode(msg);
    }

    private String receive(ByteBuffer buffer) {
        CharBuffer charBuffer = charset.decode(buffer);
        return String.valueOf(charBuffer);
    }


    public static void main(String[] args) {
        AIOServer server = new AIOServer(9090);
        try {
            server.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

public class AIOClient {

    private Charset charset = Charset.forName("utf-8");

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);

    private String ip;

    private int port;

    private volatile boolean isClosed;

    private AsynchronousSocketChannel socketChannel;

    public AIOClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void start() {
        try {
            socketChannel = AsynchronousSocketChannel.open();
            socketChannel.connect(new InetSocketAddress(ip, port)).get();
            EXECUTOR_SERVICE.submit(new ChatThread(Thread.currentThread()));
            while (!isClosed) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int length = socketChannel.read(buffer).get();
                if (length > 0) {
                    buffer.flip();
                    System.out.println(receive(buffer));
                    buffer.clear();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("线程中断退出...");
            try {
                socketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            EXECUTOR_SERVICE.shutdown();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    private class ChatThread implements Runnable {

        private Thread mainThread;

        public ChatThread(Thread mainThread) {
            this.mainThread = mainThread;
        }

        @Override
        public void run() {
            Scanner scanner = new Scanner(System.in);
            System.out.println("请留言:");
            while (!isClosed) {
                String line = scanner.nextLine();
                socketChannel.write(send(line));
                if ("quit".equalsIgnoreCase(line)) {
                    isClosed = true;
                    mainThread.interrupt();
                }
            }
        }
    }

    private ByteBuffer send(String msg) {
        return charset.encode(msg);
    }

    private String receive(ByteBuffer buffer) {
        CharBuffer charBuffer = charset.decode(buffer);
        return String.valueOf(charBuffer);
    }

    public static void main(String[] args) {
        AIOClient client = new AIOClient("127.0.0.1", 9090);
        client.start();

    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,559评论 0 11
  • 彩排完,天已黑
    刘凯书法阅读 4,199评论 1 3
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 124,515评论 2 7