NIO客户端
写这个程序的初衷是在儒猿的分布式小文件系统所受到的启发,当时老师设计和我的想法有点出入,我按照自己实现的思路实现了一套逻辑,分布式小文件系统客户端考虑的要点是高效的传输文件,在这个基础上就要求我们必须维持和DataNode的长连接,减少和DataNode连接损耗,在此,我们就需要在第一个请求建立连接后,将连接缓存在客户端,方便下次直接使用。
先说说主要的一些集合容器的作用:
- waitingConnectHosts 等待连接Host的缓存
- connections 所有的连接集合,这里主要是方便使用Host获取到SelectionKey进行感知状态变换
- connectState 所有连接状态缓存,当一个请求尝试连接的时候,如果在该集合中发现连接状态是SUCCESS,那么就会直接获取连接进行文件传输
- waitingRequests 等待发送的请求队列,当客户端进行请求提交的时候,请求首先会进入该队列进行缓存
- toSendRequests 当时机恰当的时候,会将waitingRequests的请求拉取缓存到toSendRequests中,请求在该队列是在客户端最后一个缓存队列,之后就是发送了
- unfinishedResponses 未完成的响应,因为TCP的拆包问题,在一个Read事件中可能无法将一个请求进行完整的解析,这就要求我们将未读完的请求缓存起来,等待下次Read事件进行追加读取,完成整个响应解析。
- finishedResponses 已完成响应缓存,在该缓存中存储是已经完成响应但未被客户端获取的
- callBackMap 回调缓存,当传入回调函数的时候会进入其中
核心类是NetworkManager,这个是管理所有连接的管理器,在这个类里面负责请求的解析,缓存,响应解析,回调函数调用
主要的完整流程如下:
image.png
这里主要说下要点:
在这里处理了粘包和拆包的问题,主要定义好该次请求的数据长度,例如文件上传请求,请求ID + 请求类型 + 文件名长度 + 文件长度 + 文件名 + 文件内容,如果请求ID为UUID,请求类型为int类型代表,文件名和文件长度都用int代表,那么该次请求总的大小为32 + 4 + 4 + 4 + fileNameLength + fileSize
响应的解析也是按照服务端定义的响应类型处理的
存在一个问题,所有的请求最后实例化为ByteBuffer,在高并发情况下,内存占用是个问题
NetworkManager
package org.zymf.nio.example3.client;
import org.zymf.nio.example3.constant.Constant;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author zhuyuemufeng
* @version 1.0
* @description: 通讯管理器
* @date 2021-07-20 21:29
*/
public class NetworkManager {
private Selector selector;
private ClientHandleThread moniter;
// 等待建立连接的机器
private ConcurrentLinkedQueue<Host> waitingConnectHosts;
// 所有的连接
private Map<Host, SelectionKey> connections;
// 每个数据节点的连接状态
private Map<Host, Integer> connectState;
//等待发送的请求
private Map<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> waitingRequests;
// 马上准备要发送的网络请求
private Map<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> toSendRequests;
// 未完成解析的响应
private Map<Host, NetWorkClientRespond> unfinishedResponses;
// 已经完成请求的响应
private Map<Host, Map<String, NetWorkClientRespond>> finishedResponses;
private Map<String,NetWorkRespondCallBack> callBackMap;
public NetworkManager() {
try {
this.selector = Selector.open();
this.moniter = new ClientHandleThread();
this.waitingConnectHosts = new ConcurrentLinkedQueue<>();
this.connections = new ConcurrentHashMap<>();
this.connectState = new ConcurrentHashMap<>();
this.waitingRequests = new ConcurrentHashMap<>();
toSendRequests = new ConcurrentHashMap<>();
this.finishedResponses = new ConcurrentHashMap<>();
this.unfinishedResponses = new ConcurrentHashMap<>();
this.callBackMap = new ConcurrentHashMap<>();
moniter.start();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @description: 尝试连接
* @param: * @param: host
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:53
*/
public void tryConnect(Host host) throws Exception {
if (!connectState.containsKey(host) || connectState.get(host) == Constant.FAIL_CONNECT){
waitingConnectHosts.offer(host);
connectState.put(host, Constant.WAITING_CONNECT);
}
}
/**
* @description: 验证是否完成连接
* @param: * @param: host
* @param: sync 同步等待完成
* @return: int
* @author zhuyuemufeng
* @date: 2021-07-21 8:53
*/
public int finishConnect(Host host,boolean sync) throws Exception {
boolean containsKey = connectState.containsKey(host);
if (!containsKey){
throw new RuntimeException("该连接不存在");
}
int status = connectState.get(host);
if (Constant.WAITING_CONNECT == status && sync){
while (true){
if (Constant.WAITING_CONNECT != connectState.get(host)){
return connectState.get(host);
}
Thread.sleep(200);
}
}
return status;
}
/**
* @description: 发送请求
* @param: * @param: request
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:52
*/
public void sendRequest(NetWorkClientRequest request) {
Host host = request.getHost();
waitingRequests.get(host).offer(request);
}
/**
* @description: 发送回调请求
* @param: * @param: request
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:52
*/
public void sendCallBackRequest(NetWorkClientRequest request) {
Host host = request.getHost();
waitingRequests.get(host).offer(request);
callBackMap.put(request.getRequestId(),request.getNetWorkRespondCallBack());
}
/**
* @description: 同步返回响应结果,如果还没返回就进行等待
* @param: * @param: request
* @return: org.zymf.nio.example3.client.NetWorkClientRespond
* @author zhuyuemufeng
* @date: 2021-07-21 8:52
*/
public NetWorkClientRespond waitResponseSync(NetWorkClientRequest request) throws Exception {
Host host = request.getHost();
Map<String, NetWorkClientRespond> respondMap = finishedResponses.get(host);
NetWorkClientRespond respond = null;
while ((respond = respondMap.get(request.getRequestId())) == null){
Thread.sleep(200);
}
return respond;
}
class ClientHandleThread extends Thread {
@Override
public void run() {
while (true) {
//连接注册,状态更新
registerConnect();
//准备请求,改变连接关注事件
prepareSendRequest();
//事件监听
poll();
}
}
/**
* @description: 注册连接
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:33
*/
private void registerConnect() {
Host host = null;
while ((host = waitingConnectHosts.poll()) != null) {
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(host.getIp(), host.getPort()));
channel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @description: 准备请求
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:22
*/
private void prepareSendRequest(){
for (Map.Entry<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> node : waitingRequests.entrySet()) {
//该连接已成功连接,并且有请求任务
int count = 0;
if (!node.getValue().isEmpty() && Constant.SUCCESS_CONNECT == connectState.get(node.getKey())){
System.out.println(">>>>>>>>>>>>准备请求");
ConcurrentLinkedQueue<NetWorkClientRequest> value = node.getValue();
ConcurrentLinkedQueue<NetWorkClientRequest> toSend = toSendRequests.get(node.getKey());
NetWorkClientRequest request = null;
while (count < Constant.MAX_SEND_REQUEST_SIZE && (request = value.poll()) != null){
count++;
System.out.println(">>>>>>>>>>>>加入toSendRequests请求池");
toSend.offer(request);
}
if (count != 0){
SelectionKey key = connections.get(node.getKey());
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
}
}
/**
* @description: 请求/响应读写
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:33
*/
private void poll() {
try {
int select = selector.select(Constant.POLL_BLOCK_MAX_TIME);
if (select > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel channel = (SocketChannel) key.channel();
if (key.isConnectable()) {
System.out.println(">>>>>>>>>>>>触发Connect操作");
finishConnect(key,channel);
}
if (key.isWritable()){
System.out.println(">>>>>>>>>>>>触发Write操作");
sendRequest(key,channel);
}
if (key.isReadable()){
System.out.println(">>>>>>>>>>>>触发Read操作");
readResponse(key, channel);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @description: 完成连接
* @param: * @param: key
* @param: channel
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:11
*/
private void finishConnect(SelectionKey key,SocketChannel channel) {
Host host = null;
try {
host = getHostByChannel(channel);
if (channel.finishConnect()) {
System.out.println(host + ">>>>>>>>>>>>完成连接操作");
//修改连接状态
connectState.put(host,Constant.SUCCESS_CONNECT);
System.out.println(host + ">>>>>>>>>>>>完成连接状态重置");
//初始化请求队列
waitingRequests.put(host, new ConcurrentLinkedQueue<>());
System.out.println(host + ">>>>>>>>>>>>完成初始化请求队列");
//初始化发送请求队列
toSendRequests.put(host, new ConcurrentLinkedQueue<>());
System.out.println(host + ">>>>>>>>>>>>完成初始化发送请求队列");
//初始化响应集合
finishedResponses.put(host,new ConcurrentHashMap<>());
System.out.println(host + ">>>>>>>>>>>>完成初始化响应集合");
//增加连接映射关系
connections.put(host,key);
System.out.println(host + ">>>>>>>>>>>>完成加入连接集合");
}
} catch (Exception e) {
e.printStackTrace();
connectState.put(host, Constant.FAIL_CONNECT);
}
}
/**
* @description: 请求发送
* @param: * @param: key
* @param: channel
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:34
*/
private void sendRequest(SelectionKey key, SocketChannel channel) {
try {
Host host = getHostByChannel(channel);
ConcurrentLinkedQueue<NetWorkClientRequest> netWorkClientRequests = toSendRequests.get(host);
NetWorkClientRequest request = null;
while ((request = netWorkClientRequests.poll()) != null){
ByteBuffer buffer = request.getBuffer();
buffer.flip();
while (buffer.hasRemaining()){
channel.write(buffer);
}
}
}catch (Exception e){
e.printStackTrace();
}
//取消关注写事件
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
/**
* @description: 响应解析
* @param: * @param: key
* @param: channel
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:40
*/
private void readResponse(SelectionKey key, SocketChannel channel) {
try {
Host host = getHostByChannel(channel);
NetWorkClientRespond respond = unfinishedResponses.get(host);
if (respond == null){
respond = new NetWorkClientRespond();
unfinishedResponses.put(host,respond);
}
//读取请求ID
if (respond.getRequestBuffer().hasRemaining()){
tryBestRead(channel,respond.getRequestBuffer());
if (respond.getRequestBuffer().hasRemaining()){
return;
}
}
if (respond.getRequestId() == null){
respond.getRequestBuffer().flip();
respond.setRequestId(new String(respond.getRequestBuffer().array()));
}
//读取响应状态
if (respond.getStatusBuffer().hasRemaining()){
tryBestRead(channel,respond.getStatusBuffer());
if (respond.getStatusBuffer().hasRemaining()){
return;
}
}
if (respond.getStatus() == null){
respond.getStatusBuffer().flip();
respond.setStatus(respond.getStatusBuffer().getInt(0));
}
//读取响应长度
if (respond.getContentLengthBuffer().hasRemaining()){
tryBestRead(channel,respond.getContentLengthBuffer());
if (respond.getContentLengthBuffer().hasRemaining()){
return;
}
}
if (respond.getContentLength() == null){
respond.getContentLengthBuffer().flip();
respond.setContentLength(respond.getContentLengthBuffer().getInt(0));
}
//读取响应内容
if (respond.getByteBuffer() == null){
respond.setByteBuffer(ByteBuffer.allocate(respond.getContentLength()));
}
if (respond.getByteBuffer().hasRemaining()){
tryBestRead(channel,respond.getByteBuffer());
}
if (!respond.getByteBuffer().hasRemaining()){
respond.getByteBuffer().flip();
respond.setFinished(true);
unfinishedResponses.remove(host);
if (callBackMap.containsKey(respond.getRequestId())){
callBackMap.get(respond.getRequestId()).process(respond);
}else {
finishedResponses.get(host).put(respond.getRequestId(),respond);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* @description: 尝试读满数据
* @param: * @param: channel
* @param: buffer
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:51
*/
private void tryBestRead(SocketChannel channel,ByteBuffer buffer) throws Exception {
int count = 0;
while ((count = channel.read(buffer)) > 0){}
}
/**
* @description: 从channel中获取Host
* @param: * @param: channel
* @return: org.zymf.nio.example3.client.Host
* @author zhuyuemufeng
* @date: 2021-07-21 5:46
*/
private Host getHostByChannel(SocketChannel channel){
InetSocketAddress remoteAddress = null;
Host host = null;
try {
remoteAddress = (InetSocketAddress) channel.getRemoteAddress();
host = new Host(remoteAddress.getHostName(), remoteAddress.getPort());
}catch (Exception e){
throw new RuntimeException(e);
}
return host;
}
}
}
NetWorkClientRequest
package org.zymf.nio.example3.client;
import java.nio.ByteBuffer;
/**
* @author zhuyuemufeng
* @version 1.0
* @description: 客户端发送请求
* @date 2021-07-20 20:52
*/
public class NetWorkClientRequest {
private String requestId;
private Host host;
private long sendTime;
private ByteBuffer buffer;
private boolean sync;
private NetWorkRespondCallBack netWorkRespondCallBack;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Host getHost() {
return host;
}
public void setHost(Host host) {
this.host = host;
}
public long getSendTime() {
return sendTime;
}
public void setSendTime(long sendTime) {
this.sendTime = sendTime;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public boolean isSync() {
return sync;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public NetWorkRespondCallBack getNetWorkRespondCallBack() {
return netWorkRespondCallBack;
}
public void setNetWorkRespondCallBack(NetWorkRespondCallBack netWorkRespondCallBack) {
this.netWorkRespondCallBack = netWorkRespondCallBack;
}
}
testLoadFileDemo
Long start = System.currentTimeMillis();
NetworkManager manager = new NetworkManager();
List<NetWorkClientRequest> sendRequest = new ArrayList<>();
for (int i = 1; i < 301; i++) {
Host host = new Host("localhost", 7899);
//尝试连接
manager.tryConnect(host);
//等待连接完成
manager.finishConnect(host, true);
String requestId = UUID.randomUUID().toString().replace("-", "");
ByteBuffer fileUpload = RequestBufferBuilder.createFileUpload(new File("E:\\oss\\netty\\img\\" + i + ".jpg")
, i + ".jpg",
requestId);
NetWorkClientRequest netWorkClientRequest = new NetWorkClientRequest();
netWorkClientRequest.setBuffer(fileUpload);
netWorkClientRequest.setHost(host);
netWorkClientRequest.setRequestId(requestId);
netWorkClientRequest.setSendTime(System.currentTimeMillis());
//发送请求
manager.sendRequest(netWorkClientRequest);
sendRequest.add(netWorkClientRequest);
}
for (NetWorkClientRequest request : sendRequest) {
//等待响应返回
NetWorkClientRespond respond = manager.waitResponseSync(request);
System.out.println(respond.getStatus());
/* System.out.println(respond.getRequestId());
System.out.println(new String(respond.getByteBuffer().array()));*/
}
long l = System.currentTimeMillis() - start;
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>: " + l);