本来想写关于netty类的时序图,学习下设计模式并学习如何扩展Java nio的,毕竟对于我这种拧螺丝钉的给我一个任务如何写出高内聚低耦合的代码才是重要的,但是找不到合适相关联Java NIO和netty相关的代码,所以我花费了一点时间整理了下相关代码。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class NioSocketServer {
private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int BUFFERSIZE = 1024;
private final ServerSocketChannel serverSocketChannel;
//简易的Reactor模型,一个boss线程,2倍核数的工作线程
private final Selector bossselector;
private final Work[] works;
private AtomicInteger index = new AtomicInteger();
//用于缓存每个客户端粘包拆包等数据.
private Map<SocketAddress,Read> cacheChannelBuffer = new ConcurrentHashMap<>();
public NioSocketServer()throws IOException{
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
bossselector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(bossselector, SelectionKey.OP_ACCEPT);
new Thread(new Boss()).start();
works = new Work[NCPU * 2];
for(int i = 0;i < works.length;i++){
new Thread(works[i] = new Work(Selector.open(),i)).start();
}
}
public void accept(SelectionKey key) {
System.out.println("accept事件");
try {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
int i = index.getAndIncrement() & works.length - 1;
works[i].register(socketChannel);
} catch (IOException e) {
e.printStackTrace();
}
}
//包协议:包=包头(4byte)+包体,包头内容为包体的数据长度
public void read(SelectionKey selectionKey) {
System.out.println("read事件");
try {
SocketChannel channel = (SocketChannel) selectionKey.channel();
SocketAddress address = channel.getRemoteAddress();
Read read = cacheChannelBuffer.get(address);
int bodyLen = -1;
ByteBuffer byteBuffer;
if(read == null){
byteBuffer = ByteBuffer.allocate(BUFFERSIZE);
}else{
if(read.headerLength == -1){
byteBuffer = ByteBuffer.allocate(BUFFERSIZE);
}else{
bodyLen = read.getHeaderLength();
byteBuffer = ByteBuffer.allocate(read.getHeaderLength());
}
ByteBuffer readByteBuffer = read.getByteBuffer();
if(readByteBuffer != null && readByteBuffer.hasRemaining()){
readByteBuffer.flip();
byteBuffer.put(readByteBuffer);
}
read.setByteBuffer(null);
read.setHeaderLength(-1);
}
channel.read(byteBuffer);
byteBuffer.flip();
while (byteBuffer.remaining() > 0) {
if (bodyLen == -1) {// 还没有读出包头,先读出包头
if (byteBuffer.remaining() >= 4) {// 读出包头,否则缓存
byteBuffer.mark();
bodyLen = byteBuffer.getInt();
} else {
remaining(read, byteBuffer, address, bodyLen);
break;
}
} else {// 已经读出包头
if (byteBuffer.remaining() >= bodyLen) {// 大于等于一个包,否则缓存
byte[] bodyByte = new byte[bodyLen];
byteBuffer.get(bodyByte, 0, bodyLen);
bodyLen = -1;
System.out.println("receive from clien content is:" + new String(bodyByte));
} else {
remaining(read, byteBuffer, address, bodyLen);
break;
}
}
}
// String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "<html><body>Hello World!</body></html>";
// ByteBuffer buffer = ByteBuffer.wrap(httpResponse.getBytes());
// int len = channel.write(buffer);
// if (len < 0){
// throw new IllegalArgumentException();
// }
// if (len == 0) {
// selectionKey.interestOps(SelectionKey.OP_WRITE);
// }
selectionKey.interestOps(SelectionKey.OP_READ);
} catch (Exception e) {
try {
selectionKey.cancel();
serverSocketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
private void remaining(Read read,ByteBuffer byteBuffer,SocketAddress address,int bodyLen){
if(!byteBuffer.hasRemaining()){
return;
}
if(read == null){
read = new Read();
cacheChannelBuffer.put(address,read);
}
read.setHeaderLength(bodyLen);
int remaining = byteBuffer.remaining();
byte[] remainingByte = new byte[remaining];
byteBuffer.get(remainingByte, 0, remaining);
read.setByteBuffer(ByteBuffer.allocate(remaining).put(remainingByte));
}
public void write(SelectionKey selectionKey) {
System.out.println("write事件");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "<html><body>Hello World!</body></html>";
System.out.println("response from server to client");
try {
ByteBuffer byteBuffer = ByteBuffer.wrap(httpResponse.getBytes());
while (byteBuffer.hasRemaining()) {
socketChannel.write(byteBuffer);
}
selectionKey.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
try {
selectionKey.cancel();
serverSocketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
class Boss implements Runnable{
private volatile boolean isExit;
public boolean isExit() {
return isExit;
}
public void setExit(boolean exit) {
isExit = exit;
}
@Override
public void run(){
try {
while (!isExit) {
int selectKey = bossselector.select();
if (selectKey > 0) {
Set<SelectionKey> keySet = bossselector.selectedKeys();
Iterator<SelectionKey> iter = keySet.iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
iter.remove();
if (selectionKey.isAcceptable()) {
accept(selectionKey);
} else {
System.out.println("boss线程不可能出现work线程的事件,请检查代码。");
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
}
}
}
class Work implements Runnable{
private final Selector workSelector;
private final int i;
private volatile boolean isExit ;
public Work(Selector workSelector,int i){
this.workSelector = workSelector;
this.i = i;
}
public SelectionKey register(SocketChannel socketChannel) throws ClosedChannelException {
return socketChannel.register(workSelector, SelectionKey.OP_READ);
}
public boolean isExit() {
return isExit;
}
public void setExit(boolean exit) {
isExit = exit;
}
@Override
public void run(){
try {
while (!isExit) {
int selectKey = workSelector.select(10);
if (selectKey > 0) {
Set<SelectionKey> keySet = workSelector.selectedKeys();
Iterator<SelectionKey> iter = keySet.iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
iter.remove();
if (selectionKey.isAcceptable()) {
System.out.println("work线程不可能出现boss线程的事件,请检查代码。");
}else if (selectionKey.isReadable()) {
read(selectionKey);
}else if (selectionKey.isWritable()) {
write(selectionKey);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
}
}
}
class Read{
private ByteBuffer byteBuffer;
private int headerLength = -1;
public ByteBuffer getByteBuffer() {
return byteBuffer;
}
public void setByteBuffer(ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
}
public int getHeaderLength() {
return headerLength;
}
public void setHeaderLength(int headerLength) {
this.headerLength = headerLength;
}
}
public static void main(String args[]) throws IOException {
NioSocketServer server = new NioSocketServer();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioSocketClient {
private SocketChannel socketChannel;
private Selector selector = null;
public NioSocketClient() throws IOException{
InetSocketAddress inetSocketAddress = new InetSocketAddress(8888);
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(inetSocketAddress);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
new Thread(new Work()).start();
}
public void finishConnect(SelectionKey key) {
System.out.println("client finish connect!");
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
socketChannel.finishConnect();
key.interestOps(SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
public void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = channel.read(byteBuffer);
if (len > 0) {
byteBuffer.flip();
byte[] byteArray = new byte[byteBuffer.limit()];
byteBuffer.get(byteArray);
System.out.println("client receive from server,content:"+new String(byteArray));
len = channel.read(byteBuffer);
byteBuffer.clear();
}
key.interestOps(SelectionKey.OP_READ);
}
public void send(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
for (int i = 0; i < 10; i++) {
String ss = i + "Server ,how are you ? this is package message from NioSocketClient!";
int headSize = (ss).getBytes().length;
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + headSize);
byteBuffer.putInt(headSize);
byteBuffer.put(ss.getBytes());
byteBuffer.flip();
System.out.println("client send:" + i + ",context:" + ss);
while (byteBuffer.hasRemaining()) {
try {
channel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
key.interestOps(SelectionKey.OP_READ);
}
class Work implements Runnable{
@Override
public void run(){
while (true) {
try {
int key = selector.select();
if (key > 0) {
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iter = keySet.iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = null;
synchronized (iter) {
selectionKey = iter.next();
iter.remove();
}
if (selectionKey.isConnectable()) {
finishConnect(selectionKey);
}
if (selectionKey.isWritable()) {
send(selectionKey);
}
if (selectionKey.isReadable()) {
read(selectionKey);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String args[]) throws IOException {
NioSocketClient client = new NioSocketClient();
}
}
以上服务端实现了简易的Reactor模型,并自定义了通信协议(协议格式:长度+内容),并实现了粘包和拆包的逻辑。netty不管怎么封装,最终都是要封装NIO那几行代码。
以下是我遇到的问题和我觉得需要注意细节。
1.我是在win10 jdk1.8实现的代码,服务端worker线程获取数据采用select()时,当有新的客户端连接时,获取不到数据,采用select(long timeout)可以获取到数据,这个问题没找到原因。
2.Java底层无法得知channel获取了多少数据,所以需要自定义ByteBuffer的大小,在发生拆包粘包时需要注意。netty实现了自动实现计算ByteBuffer的大小,不一定准确。
3.自定义的nio代码中,很少看到OP_WRITE的处理,经常看到的代码就是在请求处理完成后,直接通过下面的代码将结果返回给客户端。什么时候采用OP_WRITE,引用别人的一段话:
如果客户端的网络或者是中间交换机的问题,使得网络传输的效率很低,这时候会出现服务器已经准备好的返回结果无法通过TCP/IP层传输到客户端。这时候在执行上面这段程序的时候就会出现以下情况。
(1) bb.hasRemaining()一直为“true”,因为服务器的返回结果已经准备好了。
(2) socketChannel.write(bb)的结果一直为0,因为由于网络原因数据一直传不过去。
(3) 因为是异步非阻塞的方式,socketChannel.write(bb)不会被阻塞,立刻被返回。
(4) 在一段时间内,这段代码会被无休止地快速执行着,消耗着大量的CPU的资源。事实上什么具体的任务也没有做,一直到网络允许当前的数据传送出去为止。
因此,要对OP_WRITE加以处理,常用用法为:
String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "<html><body>Hello World!</body></html>";
ByteBuffer buffer = ByteBuffer.wrap(httpResponse.getBytes());
int len = channel.write(buffer);
if (len < 0){
throw new IllegalArgumentException();
}
if (len == 0) {
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
以上这段话在我实现的服务端中屏蔽了,后续会讲解netty时序图,学习netty优秀的源码。
最后,目前在找工作,现在在家cha