网络模型
前后端都采用epoll/select模型
协议
测试用json简单测试,发送信息包含2位消息长度 最大包含65535长度数据 用下面协议定制
image.png
协议代码
msgbase
using Newtonsoft.Json;
using System;
public class MsgBase
{
public string protoName = "null";
//编码
public static byte[] Encode(MsgBase msgBase)
{
string s = JsonConvert.SerializeObject(msgBase);
return System.Text.Encoding.UTF8.GetBytes(s);
}
//解码
public static MsgBase Decode(string protoName, byte[] bytes, int offset, int count)
{
string s = System.Text.Encoding.UTF8.GetString(bytes, offset, count);
MsgBase msgBase = (MsgBase)JsonConvert.DeserializeObject(s, Type.GetType(protoName));
return msgBase;
}
//编码协议名(2字节长度+字符串)
public static byte[] EncodeName(MsgBase msgBase)
{
//名字bytes和长度
byte[] nameBytes = System.Text.Encoding.UTF8.GetBytes(msgBase.protoName);
Int16 len = (Int16)nameBytes.Length;
//申请bytes数值
byte[] bytes = new byte[2 + len];
//组装2字节的长度信息
bytes[0] = (byte)(len % 256);
bytes[1] = (byte)(len / 256);
//组装名字bytes
Array.Copy(nameBytes, 0, bytes, 2, len);
return bytes;
}
//解码协议名(2字节长度+字符串)
public static string DecodeName(byte[] bytes, int offset, out int count)
{
count = 0;
//必须大于2字节
if (offset + 2 > bytes.Length)
{
return "";
}
//读取长度
Int16 len = (Int16)((bytes[offset + 1] << 8) | bytes[offset]);
if (len <= 0)
{
return "";
}
//长度必须足够
if (offset + 2 + len > bytes.Length)
{
return "";
}
//解析
count = 2 + len;
string name = System.Text.Encoding.UTF8.GetString(bytes, offset + 2, len);
return name;
}
}
c#客户端
封装一个好用的byteArr
public class ByteArray
{
//默认大小
const int DEFAULT_SIZE = 1024;
//初始大小
int initSize = 0;
//缓冲区
public byte[] bytes;
//读写位置
public int readIdx = 0;
public int writeIdx = 0;
//容量
private int capacity = 0;
//剩余空间
public int remain { get { return capacity - writeIdx; } }
//数据长度
public int length { get { return writeIdx - readIdx; } }
//构造函数
public ByteArray(int size = DEFAULT_SIZE)
{
bytes = new byte[size];
capacity = size;
initSize = size;
readIdx = 0;
writeIdx = 0;
}
//构造函数
public ByteArray(byte[] defaultBytes)
{
bytes = defaultBytes;
capacity = defaultBytes.Length;
initSize = defaultBytes.Length;
readIdx = 0;
writeIdx = defaultBytes.Length;
}
//重设尺寸
public void ReSize(int size)
{
if (size < length) return;
if (size < initSize) return;
int n = 1;
while (n < size) n *= 2;
capacity = n;
byte[] newBytes = new byte[capacity];
Array.Copy(bytes, readIdx, newBytes, 0, writeIdx - readIdx);
bytes = newBytes;
writeIdx = length;
readIdx = 0;
}
//写入数据
public int Write(byte[] bs, int offset, int count)
{
if (remain < count)
{
ReSize(length + count);
}
Array.Copy(bs, offset, bytes, writeIdx, count);
writeIdx += count;
return count;
}
//读取数据
public int Read(byte[] bs, int offset, int count)
{
count = Math.Min(count, length);
Array.Copy(bytes, 0, bs, offset, count);
readIdx += count;
CheckAndMoveBytes();
return count;
}
//检查并移动数据
public void CheckAndMoveBytes()
{
if (length < 8)
{
MoveBytes();
}
}
//移动数据
public void MoveBytes()
{
if (length > 0)
{
Array.Copy(bytes, readIdx, bytes, 0, length);
}
writeIdx = length;
readIdx = 0;
}
//打印缓冲区
public override string ToString()
{
return BitConverter.ToString(bytes, readIdx, length);
}
//打印调试信息
public string Debug()
{
return string.Format("readIdx({0}) writeIdx({1}) bytes({2})",
readIdx,
writeIdx,
BitConverter.ToString(bytes, 0, capacity)
);
}
}
客户端核心代码
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
namespace NetWorkUtils.client
{
public static class NetManager
{
//定义套接字
static Socket socket;
//接收缓冲区
static ByteArray readBuff;
//写入队列
static Queue<ByteArray> writeQueue;
//是否正在连接
static bool isConnecting = false;
//是否正在关闭
static bool isClosing = false;
//消息列表
static List<MsgBase> msgList = new List<MsgBase>();
//消息列表长度
static int msgCount = 0;
//每一次Update处理的消息量
readonly static int MAX_MESSAGE_FIRE = 10;
//是否启用心跳
public static bool isUsePing = true;
//心跳间隔时间
public static int pingInterval = 30;
//上一次发送PING的时间
static long lastPingTime = 0;
//上一次收到PONG的时间
static long lastPongTime = 0;
//事件
public enum NetEvent
{
ConnectSucc = 1,
ConnectFail = 2,
Close = 3,
}
//事件委托类型
public delegate void EventListener(String err);
//事件监听列表
private static Dictionary<NetEvent, EventListener> eventListeners = new Dictionary<NetEvent, EventListener>();
//添加事件监听
public static void AddEventListener(NetEvent netEvent, EventListener listener)
{
//添加事件
if (eventListeners.ContainsKey(netEvent))
{
eventListeners[netEvent] += listener;
}
//新增事件
else
{
eventListeners[netEvent] = listener;
}
}
//删除事件监听
public static void RemoveEventListener(NetEvent netEvent, EventListener listener)
{
if (eventListeners.ContainsKey(netEvent))
{
eventListeners[netEvent] -= listener;
}
}
//分发事件
private static void FireEvent(NetEvent netEvent, String err)
{
if (eventListeners.ContainsKey(netEvent))
{
eventListeners[netEvent](err);
}
}
//消息委托类型
public delegate void MsgListener(MsgBase msgBase);
//消息监听列表
private static Dictionary<string, MsgListener> msgListeners = new Dictionary<string, MsgListener>();
//添加消息监听
public static void AddMsgListener(string msgName, MsgListener listener)
{
//添加
if (msgListeners.ContainsKey(msgName))
{
msgListeners[msgName] += listener;
}
//新增
else
{
msgListeners[msgName] = listener;
}
}
//删除消息监听
public static void RemoveMsgListener(string msgName, MsgListener listener)
{
if (msgListeners.ContainsKey(msgName))
{
msgListeners[msgName] -= listener;
}
}
//分发消息
private static void FireMsg(string msgName, MsgBase msgBase)
{
if (msgListeners.ContainsKey(msgName))
{
msgListeners[msgName](msgBase);
}
}
//连接
public static void Connect(string ip, int port)
{
//状态判断
if (socket != null && socket.Connected)
{
Debug.WriteLine("Connect fail, already connected!");
return;
}
if (isConnecting)
{
Debug.WriteLine("Connect fail, isConnecting");
return;
}
//初始化成员
InitState();
//参数设置
socket.NoDelay = true;
//Connect
isConnecting = true;
socket.BeginConnect(ip, port, ConnectCallback, socket);
}
//初始化状态
private static void InitState()
{
//Socket
socket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//接收缓冲区
readBuff = new ByteArray();
//写入队列
writeQueue = new Queue<ByteArray>();
//是否正在连接
isConnecting = false;
//是否正在关闭
isClosing = false;
//消息列表
msgList = new List<MsgBase>();
//消息列表长度
msgCount = 0;
long time = GetTimeStamp();
//上一次发送PING的时间
lastPingTime = time;
//上一次收到PONG的时间
lastPongTime = time;
//监听PONG协议
if (!msgListeners.ContainsKey("MsgPong"))
{
AddMsgListener("MsgPong", OnMsgPong);
}
}
//Connect回调
private static void ConnectCallback(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
socket.EndConnect(ar);
Debug.WriteLine("Socket Connect Succ ");
FireEvent(NetEvent.ConnectSucc, "");
isConnecting = false;
//开始接收
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx,
readBuff.remain, 0, ReceiveCallback, socket);
}
catch (SocketException ex)
{
Debug.WriteLine("Socket Connect fail " + ex.ToString());
FireEvent(NetEvent.ConnectFail, ex.ToString());
isConnecting = false;
}
}
//关闭连接
public static void Close()
{
//状态判断
if (socket == null || !socket.Connected)
{
return;
}
if (isConnecting)
{
return;
}
//还有数据在发送
if (writeQueue.Count > 0)
{
isClosing = true;
}
//没有数据在发送
else
{
socket.Close();
FireEvent(NetEvent.Close, "");
}
}
//发送数据
public static void Send(MsgBase msg)
{
//状态判断
if (socket == null || !socket.Connected)
{
return;
}
if (isConnecting)
{
return;
}
if (isClosing)
{
return;
}
//数据编码
byte[] nameBytes = MsgBase.EncodeName(msg);
byte[] bodyBytes = MsgBase.Encode(msg);
int len = nameBytes.Length + bodyBytes.Length;
byte[] sendBytes = new byte[2 + len];
//组装长度
sendBytes[0] = (byte)(sendBytes.Length % 256);
sendBytes[1] = (byte)(sendBytes.Length / 256);
//组装名字
Array.Copy(nameBytes, 0, sendBytes, 2, nameBytes.Length);
//组装消息体
Array.Copy(bodyBytes, 0, sendBytes, 2 + nameBytes.Length, bodyBytes.Length);
//写入队列
ByteArray ba = new ByteArray(sendBytes);
int count = 0; //writeQueue的长度
lock (writeQueue)
{
writeQueue.Enqueue(ba);
count = writeQueue.Count;
}
//send
if (count == 1)
{
socket.BeginSend(sendBytes, 0, sendBytes.Length,
0, SendCallback, socket);
}
}
//Send回调
public static void SendCallback(IAsyncResult ar)
{
//获取state、EndSend的处理
Socket socket = (Socket)ar.AsyncState;
//状态判断
if (socket == null || !socket.Connected)
{
return;
}
//EndSend
int count = socket.EndSend(ar);
//获取写入队列第一条数据
ByteArray ba;
lock (writeQueue)
{
ba = writeQueue.FirstOrDefault();
}
//完整发送
ba.readIdx += count;
if (ba.length == 0)
{
lock (writeQueue)
{
writeQueue.Dequeue();
ba = writeQueue.FirstOrDefault();
}
}
//继续发送
if (ba != null)
{
socket.BeginSend(ba.bytes, ba.readIdx, ba.length,
0, SendCallback, socket);
}
//正在关闭
else if (isClosing)
{
socket.Close();
}
}
//Receive回调
public static void ReceiveCallback(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
//获取接收数据长度
int count = socket.EndReceive(ar);
readBuff.writeIdx += count;
//处理二进制消息
OnReceiveData();
//继续接收数据
if (readBuff.remain < 8)
{
readBuff.MoveBytes();
readBuff.ReSize(readBuff.length * 2);
}
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx,
readBuff.remain, 0, ReceiveCallback, socket);
}
catch (SocketException ex)
{
Debug.WriteLine("Socket Receive fail" + ex.ToString());
}
}
//数据处理
public static void OnReceiveData()
{
//消息长度
if (readBuff.length <= 2)
{
return;
}
//获取消息体长度
int readIdx = readBuff.readIdx;
byte[] bytes = readBuff.bytes;
Int16 bodyLength = (Int16)((bytes[readIdx + 1] << 8) | bytes[readIdx]);
if (readBuff.length < bodyLength)
return;
//先去掉2个消息长度信息
readBuff.readIdx += 2;
//解析协议名
int nameCount = 0;
string protoName = MsgBase.DecodeName(readBuff.bytes, readBuff.readIdx, out nameCount);
if (protoName == "")
{
Debug.WriteLine("OnReceiveData MsgBase.DecodeName fail");
return;
}
readBuff.readIdx += nameCount;
//解析协议体
int bodyCount = bodyLength - nameCount - 2;
MsgBase msgBase = MsgBase.Decode(protoName, readBuff.bytes, readBuff.readIdx, bodyCount);
readBuff.readIdx += bodyCount;
readBuff.CheckAndMoveBytes();
//添加到消息队列
lock (msgList)
{
msgList.Add(msgBase);
msgCount++;
}
//继续读取消息
if (readBuff.length > 2)
{
OnReceiveData();
}
}
//Update
public static void Update()
{
MsgUpdate();
PingUpdate();
}
//更新消息
public static void MsgUpdate()
{
//初步判断,提升效率
if (msgCount == 0)
{
return;
}
//重复处理消息
for (int i = 0; i < MAX_MESSAGE_FIRE; i++)
{
//获取第一条消息
MsgBase msgBase = null;
lock (msgList)
{
if (msgList.Count > 0)
{
msgBase = msgList[0];
msgList.RemoveAt(0);
msgCount--;
}
}
//分发消息
if (msgBase != null)
{
FireMsg(msgBase.protoName, msgBase);
}
//没有消息了
else
{
break;
}
}
}
//发送PING协议
private static void PingUpdate()
{
//是否启用
if (!isUsePing)
{
return;
}
//发送PING
long time = GetTimeStamp();
if (time - lastPingTime > pingInterval)
{
MsgPing msgPing = new MsgPing();
Send(msgPing);
lastPingTime = time;
}
//检测PONG时间
if (time - lastPongTime > pingInterval * 4)
{
Close();
}
}
//监听PONG协议
private static void OnMsgPong(MsgBase msgBase)
{
lastPongTime = GetTimeStamp();
}
public static long GetTimeStamp()
{
TimeSpan ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0);
return Convert.ToInt64(ts.TotalSeconds);
}
}
}
java netty服务端
测试期间,简单将受到的信息返回pong
maven pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <!-- Use 'netty-all' for 4.0 or above -->
<version>4.1.53.Final</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
nettyserver
public class NettyServer {
// 通过nio方式来接收连接和处理连接
private EventLoopGroup bg =
new NioEventLoopGroup();
private EventLoopGroup wg =
new NioEventLoopGroup();
// 启动引导器
private ServerBootstrap b =
new ServerBootstrap();
public void run() {
//1 设置reactor 线程
b.group(bg, wg);
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
String ip = "127.0.0.1";
b.localAddress(new InetSocketAddress(ip, 8888));
//4 设置通道选项
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
//5 装配流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception {
// 管理pipeline中的Handler
ch.pipeline().addLast(new MyDecode());
ch.pipeline().addLast(new MyDecoderName());
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = null;
boolean isStart = false;
while (!isStart) {
try {
channelFuture = b.bind().sync();
System.out.println("server启动, 端口为: " +
channelFuture.channel().localAddress());
isStart = true;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
// 7 监听通道关闭事件
// 应用程序会一直等待,直到channel关闭
ChannelFuture closeFuture =
channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (
Exception e) {
e.printStackTrace();
} finally {
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
wg.shutdownGracefully();
bg.shutdownGracefully();
}
}
}
mydecode
public class MyDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()<2){
return;
}
//记录当前ByteBuf的读指针位置,以便下面取报文长度字节
//pos是一个完整报文的开始位置,报文整体会在ByteBuf中移动,类似内存管理,所以基于字节的判断报文长度等等,都是基于pos,否则可以在byteBuf.readBytes()之后加,byteBuf.discardReadBytes();整理ByteBuf,使pos回到0开始位置
int pos = byteBuf.readerIndex();
System.out.println(pos);
int msgLen = ((byteBuf.getByte(pos +1))<<8) | (byteBuf.getByte(pos));
System.out.println(byteBuf.getByte(pos +1));
System.out.println(byteBuf.getByte(pos));
System.out.println(msgLen);
//收到的报文长度不足一个完整的报文,继续接收
if(byteBuf.readableBytes()<msgLen){
return;
}
byte[] msgContent = new byte[msgLen+2];
byteBuf.readBytes(msgContent);
//提出完整报文(readBytes读到msg中),放到list给下一个Handler处理
if(msgLen>0){
list.add(msgContent);
}
}
}
handle
public class MyDecoderName extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("receive");
String s = new String((byte[]) msg);
System.out.println(s);
Pong pong = new Pong();
// ctx.writeAndFlush("hello fuck");
/* ByteBuf byteBuf = (ByteBuf) msg;
// 判断报文长度
int bufLen = byteBuf.readableBytes();
System.out.println("msg:" + bufLen);*/
//业务的报文处理
//发送消息如果不是bytebuf则无法发出
ctx.writeAndFlush(Unpooled.wrappedBuffer(pong.decode()));
}
static class Pong{
private String protoName = "MsgPong";
/**
* @return the String
* @author: yangniuhaojiang
* @title: getProtoName
* @description: update_version: update_date: update_author: update_note:
*/
public String getProtoName() {
return protoName;
}
/**
* @param protoName the String to set
* @author: yangniuhaojiang
* @title: setProtoName
* @description: update_version: update_date: update_author: update_note:
*/
public void setProtoName(String protoName) {
this.protoName = protoName;
}
byte[] decode() {
String body = JSON.toJSONString(this);
byte[] bodyBytes = body.getBytes();
int bodylen = bodyBytes.length;
//名字bytes和长度
byte[] nameBytes1 = this.protoName.getBytes();
int len = nameBytes1.length;
//申请bytes数值
byte[] nameBytes = new byte[2 + len];
//组装2字节的长度信息
nameBytes[0] = (byte) (len % 256);
nameBytes[1] = (byte) (len / 256);
System.arraycopy(nameBytes1, 0, nameBytes, 2, nameBytes1.length);
//组装名字bytes
byte[] bytes = new byte[2 + bodylen + nameBytes.length];
bytes[0] = (byte) (bytes.length % 256);
bytes[1] = (byte) (bytes.length / 256);
System.arraycopy(nameBytes, 0, bytes, 2, nameBytes.length);
//组装消息体
System.arraycopy(bodyBytes, 0, bytes, 2 + nameBytes.length, bodyBytes.length);
return bytes;
}
}
}
最终测试
服务端启动netty
客户端发送任意消息后收到pong信息
参考书籍
《Unity3d网络游戏实战(第二版)》
《Netty,Redis,Zookeeper高并发实战》