通过socket编程掌握IO流 —— NIO

一、本次目标

  1. 改造server,采用NIO读取client信息;
  2. 改造client,亦采用NIO发送消息,与之前不同的BIO形成对比;

二、编码

1、新建byte数组拼接公共类

主要用作在channel读取数据时,因为缓存大小的原因,很可能出现中文半包(一个汉字的byte处在两次读取的buffer缓冲区中)而导致乱码;

package com.cjt.io;

public class CommonUtil {

  /**
   * byte数组拼接
   */
  public static byte[] concat(byte[] first, byte[] second) {
    byte[] result = new byte[first.length + second.length];
    System.arraycopy(first, 0, result, 0, first.length);
    System.arraycopy(second, 0, result, first.length, second.length);
    return result;
  }
}

2、提取channel缓存读取byte数组

便于server和client公共调用相同的代码实现;

package com.cjt.io.nio;

import com.cjt.io.CommonUtil;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

class ChannelHandler {

  static byte[] readFromChannel(SocketChannel channel, int buffSize) throws IOException {
    // 创建缓存区
    ByteBuffer buffer = ByteBuffer.allocate(buffSize);
    byte[] result = {};
    while ((channel.read(buffer)) != -1) {
      // 切换为写模式,buffer的position置为0
      buffer.flip();
      // 读取buffer有效数据
      byte[] bytes = new byte[buffer.limit()];
      buffer.get(bytes, 0, bytes.length);
      // 数组拼接
      result = CommonUtil.concat(result, bytes);
      // 清空缓存区
      buffer.clear();
    }
    return result;
  }
}

3、修改server,采用NIO机制读取client消息

需要添加selector进行管理channel,无限轮询就绪的channel主要分为两部分,首先需要将自己的serverChannel注册到selector中,同时获取对应clientChannel,设置相关参数也要注册到selector中;

package com.cjt.io.nio;

import com.cjt.io.Config;
import com.cjt.io.DateUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

public class ServerNIO implements Config {

  private Selector selector;

  private ServerNIO(int port) {
    try {
      // 创建选择器
      selector = Selector.open();
      // 打开监听通道
      ServerSocketChannel channel;
      channel = ServerSocketChannel.open();
      // 非阻塞模式
      channel.configureBlocking(false);
      // 绑定端口
      channel.socket().bind(new InetSocketAddress(port));
      // 将channel注册到selector中,并且“兴趣模式”为“accept”
      channel.register(selector, SelectionKey.OP_ACCEPT);
      System.out.println("[" + DateUtil.getCurTimeStr() + "]:服务端已经启动");
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  private void start() {
    while (true) {
      try {
        // select方法会阻塞当前线程
        System.out.println("[" + DateUtil.getCurTimeStr() + "]:开始轮询获取就绪的channel");
        int readyChannels = selector.select();
        if (readyChannels == 0) continue;
        System.out.println("[" + DateUtil.getCurTimeStr() + "]:有chanel就绪啦");

        Set<SelectionKey> keySet = selector.selectedKeys();
        for (SelectionKey key : keySet) {
          // 删除正在处理的key
          keySet.remove(key);
          if (key.isValid()) {
            // 可接受的(server注册的key)
            if (key.isAcceptable()) {
              // 获取注册的server服务端channel通道
              ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
              // 获取连接的client客户端channel通道
              SocketChannel socketChannel = serverSocketChannel.accept();
              System.out.println("[" + DateUtil.getCurTimeStr() + "]:有新的客户端连接啦");
              // 设置client的channel为非阻塞
              socketChannel.configureBlocking(false);
              // 注册当前clientChannel到selector中,并设置clientChannel为可读
              socketChannel.register(selector, SelectionKey.OP_READ);
              System.out.println();
            }
            // 可读的(client注册的key)
            if (key.isReadable()) {
              SocketChannel socketChannel = (SocketChannel) key.channel();
              byte[] result = ChannelHandler.readFromChannel(socketChannel, 1024);

              String resultStr = new String(result, DEFAULT_ENCODE);

              System.out.println("[" + DateUtil.getCurTimeStr() + "]:client传来消息");
              System.out.println(resultStr);
              System.out.println();

              socketChannel.shutdownInput();

              System.out.println("[" + DateUtil.getCurTimeStr() + "]:server写入消息");
              System.out.println(resultStr);
              System.out.println();

              ByteBuffer buffer = ByteBuffer.wrap(result);
              socketChannel.write(buffer);
              // 当前channel操作完毕后一定要执行清除操作
              key.cancel();
              socketChannel.close();
            }
          }
        }
      } catch (IOException e) {
        e.printStackTrace();
        break;
      }
    }
  }


  public static void main(String[] args) {
    new ServerNIO(DEFAULT_PORT).start();
  }
}

其实现在我们可以用上一篇中的ClientBIO来测试,照例先启动server,然后运行client的main方法;

client控制台:

[19:53:50]:连接server成功
[19:53:50]:连接server成功
[19:53:51]:client写入消息
我是第二个客户端
[19:53:53]:client写入消息
我是第一个客户端
[19:53:53]:server传来消息
我是第一个客户端

[19:53:53]:server传来消息
我是第二个客户端

server控制台:

[19:53:15]:服务端已经启动
[19:53:15]:开始轮询获取就绪的channel
[19:53:50]:有chanel就绪啦
[19:53:50]:有新的客户端连接啦

[19:53:50]:开始轮询获取就绪的channel
[19:53:50]:有chanel就绪啦
[19:53:50]:有新的客户端连接啦

[19:53:50]:开始轮询获取就绪的channel
[19:53:50]:有chanel就绪啦
[19:53:53]:client传来消息
我是第一个客户端

[19:53:53]:server写入消息
我是第一个客户端

[19:53:53]:开始轮询获取就绪的channel
[19:53:53]:有chanel就绪啦
[19:53:53]:client传来消息
我是第二个客户端

[19:53:53]:server写入消息
我是第二个客户端

[19:53:53]:开始轮询获取就绪的channel

这里说个细节,仅仅server实现NIO,client相当于发起连接在server中就一直处于就绪状态,因而会造成阻塞,并且while循环读取数据时会无限循环直到client关闭socket流。如果操作ByteBuffer读取全部很容易造成OOM。

结合起来看很能发现问题:

  1. client中两个连接server是同时的,但是server中可以看到一个是50秒,一个是53秒;
  2. server中第一个就绪的channel连接在50秒,然后在53秒才打印出client的消息;
  3. server中接受client还是阻塞的;

所以仅仅是server实现NIO是不行的。

4、修改client实现NIO编程

这里简单处理,client收发消息都是单线程,就不需要采用selector进行管理;

package com.cjt.io.nio;

import com.cjt.io.Config;
import com.cjt.io.DateUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class ClientNIO implements Config, Runnable {

  private String serverIp;

  private int port;

  private String msg;

  private int second;

  private ClientNIO(String serverIp, int port) {
    this.serverIp = serverIp;
    this.port = port;
  }

  private ClientNIO second(int second) {
    this.second = second;
    return this;
  }

  private ClientNIO msg(String msg) {
    this.msg = msg;
    return this;
  }

  private void sleep(int second) {
    try {
      Thread.sleep(second * 1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void run() {
    try (SocketChannel channel = SocketChannel.open()) {
      if (channel.connect(new InetSocketAddress(serverIp, port))) {
        System.out.println("[" + DateUtil.getCurTimeStr() + "]:连接server成功");

        channel.configureBlocking(false);

        byte[] bytes = msg.getBytes(DEFAULT_ENCODE);
        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
        buffer.put(bytes);
        buffer.flip();

        sleep(second);

        System.out.println("[" + DateUtil.getCurTimeStr() + "]:client写入消息");
        System.out.println(msg);
        channel.write(buffer);

        channel.shutdownOutput();

        byte[] result = ChannelHandler.readFromChannel(channel, 1024 * 1024);

        String resultStr = new String(result, DEFAULT_ENCODE);
        System.out.println("[" + DateUtil.getCurTimeStr() + "]:server传来消息");
        System.out.println(resultStr);
        System.out.println();
      } else {
        System.out.println("[" + DateUtil.getCurTimeStr() + "]:连接server失败");
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) {
    ClientNIO client_1 = new ClientNIO(DEFAULT_ADDR, DEFAULT_PORT).second(3).msg("我是第一个客户端");
    new Thread(client_1).start();
    ClientNIO client_2 = new ClientNIO(DEFAULT_ADDR, DEFAULT_PORT).second(1).msg("我是第二个客户端");
    new Thread(client_2).start();
  }
}

基本步骤和server大同小异,那么我们再次测试看看;

client控制台:

[20:08:38]:连接server成功
[20:08:38]:连接server成功
[20:08:39]:client写入消息
我是第二个客户端
[20:08:39]:server传来消息
我是第二个客户端

[20:08:41]:client写入消息
我是第一个客户端
[20:08:41]:server传来消息
我是第一个客户端

server控制台:

[20:08:30]:服务端已经启动
[20:08:30]:开始轮询获取就绪的channel
[20:08:38]:有chanel就绪啦
[20:08:38]:有新的客户端连接啦

[20:08:38]:开始轮询获取就绪的channel
[20:08:38]:有chanel就绪啦
[20:08:38]:有新的客户端连接啦

[20:08:38]:开始轮询获取就绪的channel
[20:08:39]:有chanel就绪啦
[20:08:39]:client传来消息
我是第二个客户端

[20:08:39]:server写入消息
我是第二个客户端

[20:08:39]:开始轮询获取就绪的channel
[20:08:41]:有chanel就绪啦
[20:08:41]:client传来消息
我是第一个客户端

[20:08:41]:server写入消息
我是第一个客户端

[20:08:41]:开始轮询获取就绪的channel

client输出的没有任何变化,但是server就不一样了。server是在39秒也就是client在第一个写入消息时并不是连接server的时间,提示有就绪的channel,然后打印的也是“我是第二个客户端”。然后在41秒时再次提示有个就绪的channel,打印“我是第一个客户端”。Perfect!!我们要达到的不就是这种效果么(~ ̄▽ ̄)~

三、总结

相对传统的BIO,NIO的确要复杂不少,但是在高并发上效率要高得多。简单总结下server实现NIO的步骤:

  1. 创建selector选择器;
  2. 打开监听通道,设置为非阻塞,绑定好端口;
  3. 注册监听通道到之前的selector中,设置模式为“ACCEPT”;
  4. 编写主无限循环体,执行selector.select()阻塞线程获取就绪的channel;
  5. 轮询就绪的SelectionKey,如果模式为“ACCEPT”说明是server的channel,获取连接server的client的channel,设置为非阻塞“READ”模式,并注册到selector中;如果模式为“READ”则说明是client的channel,直接创建ByteBuffer缓存区读取数据,一定要记得key.cancel()socketChannel.close()

client实现NIO与server相差无几,只不过我这里简单处理做成单线程,实际上大多数的client还是需要加入selector监听通道,因为单独的读取和写入应该分为不同的线程处理。

从channel读取数据时,一定要注意读取的是有效数据,所以要好好掌握ByteBuffer的使用方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • 概述 NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区),Selector。 传统IO基于...
    时之令阅读 3,687评论 0 8
  • Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java I...
    JackChen1024阅读 7,554评论 1 143
  • java nio Java的IO体系:旧IO新IO:nio,用ByteBuffer和FileChannel读写ni...
    则不达阅读 838评论 0 2
  • 前言: 之前的文章《Java文件IO常用归纳》主要写了Java 标准IO要注意的细节和技巧,由于网上各种学习途径,...
    androidjp阅读 2,900评论 0 22
  • 日目标:1800 当月目标:54000 当日销售:0 现在销售:80000 当日差比:-1800 当月差比:260...
    c6397a55fadf阅读 133评论 0 0