最后介绍一下Selector,选择器提供选择执行已经就绪的任务的能力,这使得多元I/O成为了可能,就绪执行和多元选择使得单线程能够有效地同时管理多个I/O通道。选择器的执行主要分为以下几个步骤:
1、创建一个或者多个可选择的通道(SelectableChannel)
2、将这些创建的通道注册到选择器对象中
3、选择器会记住开发者关心的通道,它们也会追踪对应的通道是否已经就绪
4、开发者调用一个选择器对象的select()方法,当方法从阻塞状态返回时,选择键会被更新
5、获取选择键的集合,找到当时已经就绪的通道,通过遍历这些键,开发者可以选择对已就绪的通道要做的操作
选择器
选择器的作用是管理了被注册的通道集合和它们的就绪状态,假设我们有四个Socket通道的选择器,可以通过下面方式创建:
Selector selector = Selector.open();
channel1.register(selector, SelectionKey.OP_READ);
channel2.register(selector, SelectionKey.OP_WRITE);
channel3.register(selector, SelectionKey.OP_READ | OP_WRITE);
channel4.register(selector, SelectionKey.OP_READ | OP_ACCEPT);
ready = selector.select(10000);
select()方法在将线程置于睡眠状态直到这些感兴趣的事件中的一个发生或者10秒钟过去,这就是所谓的事件驱动。
通道是调用register方法注册到选择器上的,从代码里面可以看到register()方法接受一个Selector对象作为参数,以及一个名为ops的整数型参数,第二个参数表示关心的通道操作。有四种被定义的可选择操作:读(read)、写(write)、连接(connect)和接受(accept)。
注意并非所有的操作都在所有的可选择通道上被支持,例如SocketChannel就不支持accept。
选择键
一个键表示一个特定的通道对象和一个特定的选择器对象之间的注册关系。
public abstract class SelectionKey
{
public static final int OP_READ;
public static final int OP_WRITE;
public static final int OP_CONNECT;
public static final int OP_ACCEPT;
public abstract SelectableChannel channel();
public abstract Selector selector();
public abstract void cancel();
public abstract boolean isValid();
public abstract int interestOps();
public abstract void iterestOps(int ops);
public abstract int readyOps();
public final boolean isReadable();
public final boolean isWritable();
public final boolean isConnectable();
public final boolean isAcceptable();
public final Object attach(Object ob);
public final Object attachment();
}
选择器维护着注册过的通道的集合,并且这些注册关系中的任意一个都是封装在SelectionKey对象中的。每一个Selector对象维护三种键的集合:
public abstract class Selector
{
...
public abstract Set keys();
public abstract Set selectedKeys();
public abstract int select() throws IOException;
public abstract int select(long timeout) throws IOException;
public abstract int selectNow() throws IOException;
public abstract void wakeup();
...
}
已注册的键的集合(Registered key set)
与选择器关联的已经注册的键的集合,并不是所有注册过的键都有效,这个集合通过keys()方法返回,并且可能是空的。这些键的集合是不可以直接修改的,试图这么做将引发java.lang.UnsupportedOperationException。
已选择的键的集合(Selected key set)
已注册的键的集合的子集,这个集合的每个成员都是相关的通道被选择器判断为已经准备好的并且包含于键的interest集合中的操作。这个集合通过selectedKeys()方法返回(有可能是空的)。键可以直接从这个集合中移除,但不能添加。试图向已选择的键的集合中添加元素将抛java.lang.UnsupportedOperationException。
已取消的键的集合(Cancelled key set)
已注册的键的集合的子集,这个集合包含了cancel()方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问。
下面结合之前的Channel和Buffer,看一下如何写和使用选择器实现服务端Socket数据接收的程序。
服务端
1 public class SelectorServer
2 {
3 private static int PORT = 1234;
4
5 public static void main(String[] args) throws Exception
6 {
7 // 先确定端口号
8 int port = PORT;
9 if (args != null && args.length > 0)
10 {
11 port = Integer.parseInt(args[0]);
12 }
13 // 打开一个ServerSocketChannel
14 ServerSocketChannel ssc = ServerSocketChannel.open();
15 // 获取ServerSocketChannel绑定的Socket
16 ServerSocket ss = ssc.socket();
17 // 设置ServerSocket监听的端口
18 ss.bind(new InetSocketAddress(port));
19 // 设置ServerSocketChannel为非阻塞模式
20 ssc.configureBlocking(false);
21 // 打开一个选择器
22 Selector selector = Selector.open();
23 // 将ServerSocketChannel注册到选择器上去并监听accept事件
24 ssc.register(selector, SelectionKey.OP_ACCEPT);
25 while (true)
26 {
27 // 这里会发生阻塞,等待就绪的通道
28 int n = selector.select();
29 // 没有就绪的通道则什么也不做
30 if (n == 0)
31 {
32 continue;
33 }
34 // 获取SelectionKeys上已经就绪的通道的集合
35 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
36 // 遍历每一个Key
37 while (iterator.hasNext())
38 {
39 SelectionKey sk = iterator.next();
40 // 通道上是否有可接受的连接
41 if (sk.isAcceptable())
42 {
43 ServerSocketChannel ssc1 = (ServerSocketChannel)sk.channel();
44 SocketChannel sc = ssc1.accept();
45 sc.configureBlocking(false);
46 sc.register(selector, SelectionKey.OP_READ);
47 }
48 // 通道上是否有数据可读
49 else if (sk.isReadable())
50 {
51 readDataFromSocket(sk);
52 }
53 iterator.remove();
54 }
55 }
56 }
57
58 private static ByteBuffer bb = ByteBuffer.allocate(1024);
59
60 // 从通道中读取数据
61 protected static void readDataFromSocket(SelectionKey sk) throws Exception
62 {
63 SocketChannel sc = (SocketChannel)sk.channel();
64 bb.clear();
65 while (sc.read(bb) > 0)
66 {
67 bb.flip();
68 while (bb.hasRemaining())
69 {
70 System.out.print((char)bb.get());
71 }
72 System.out.println();
73 bb.clear();
74 }
75 }
76 }
满足isAcceptable()则表示该通道上有数据到来了,此时我们做的事情不是获取该通道—>创建一个线程来读取该通道上的数据,这么做就和BIO没有区别了。我们做的事情只是简单地将对应的SocketChannel注册到选择器上,通过传入OP_READ标记,告诉选择器我们关心新的Socket通道什么时候可以准备好读数据。
满足isReadable()则表示新注册的Socket通道已经可以读取数据了,此时调用readDataFromSocket方法读取SocketChannel中的数据。
客户端
选择器客户端的代码,没什么要求,只要向服务器端发送数据就可以了。
1 public class SelectorClient
2 {
3 private static final String STR = "Hello World!";
4 private static final String REMOTE_IP = "127.0.0.1";
5 private static final int THREAD_COUNT = 5;
6
7 private static class NonBlockingSocketThread extends Thread
8 {
9 public void run()
10 {
11 try
12 {
13 int port = 1234;
14 SocketChannel sc = SocketChannel.open();
15 sc.configureBlocking(false);
16 sc.connect(new InetSocketAddress(REMOTE_IP, port));
17 while (!sc.finishConnect())
18 {
19 System.out.println("同" + REMOTE_IP + "的连接正在建立,请稍等!");
20 Thread.sleep(10);
21 }
22 System.out.println("连接已建立,待写入内容至指定ip+端口!时间为" + System.currentTimeMillis());
23 String writeStr = STR + this.getName();
24 ByteBuffer bb = ByteBuffer.allocate(writeStr.length());
25 bb.put(writeStr.getBytes());
26 bb.flip(); // 写缓冲区的数据之前一定要先反转(flip)
27 sc.write(bb);
28 bb.clear();
29 sc.close();
30 }
31 catch (IOException e)
32 {
33 e.printStackTrace();
34 }
35 catch (InterruptedException e)
36 {
37 e.printStackTrace();
38 }
39 }
40 }
41
42 public static void main(String[] args) throws Exception
43 {
44 NonBlockingSocketThread[] nbsts = new NonBlockingSocketThread[THREAD_COUNT];
45 for (int i = 0; i < THREAD_COUNT; i++)
46 nbsts[i] = new NonBlockingSocketThread();
47 for (int i = 0; i < THREAD_COUNT; i++)
48 nbsts[i].start();
49 // 一定要join保证线程代码先于sc.close()运行,否则会有AsynchronousCloseException
50 for (int i = 0; i < THREAD_COUNT; i++)
51 nbsts[i].join();
52 }
53 }