1、介绍
上一节中解决了粘包拆包的问题,但是还有大小端兼容、发送不完整等问题需要解决,这一节就来解决这几个问题。
2、客户端
2.1 大小端兼容问题
在不同的机器上的编码方式不同,有的是大端编码,有的是小端编码。而网络数据报统一是小端编码,C#中的Reverse函数可以进行大小端编码的转换。
// 组装消息
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
short length = (short)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(length);
// 统一为小端编码
if(!BitConverter.IsLittleEndian)
{
lenBytes.Reverse();
}
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
2.2 发送不完整问题
由于操作系统缓冲区的设置和网络状况等问题。会出现发送不完整的情况,一个包只发了一半。
解决这个问题的方法是在发送数据前记录下要发送的数据,根据socket.Send()函数返回的发送字节数判断是否发送完整,如果没有发送完整,则继续发送。
// 将发送数据写入发送队列
public void Send()
{
// 发送数据
string sendStr= "hello Python, hello Python, hello Python, hello Python, hello Python, hello Python";
// 组装消息
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
short length = (short)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(length);
// 统一为小端编码
if(!BitConverter.IsLittleEndian)
{
lenBytes.Reverse();
}
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
// 发送钱保存数据
ByteArray ba = new ByteArray(sendBytes);
// 加锁,防止线程冲突
lock(writeQueue)
{
writeQueue.Enqueue(ba);
}
// 发送数据
DoSend();
}
// 读取发送队列的第一条数据,发送数据
public void DoSend()
{
lock(writeQueue)
{
if (writeQueue.Count <= 0)
return;
}
// 读取队列的第一条数据
ByteArray ba;
lock (writeQueue)
{
ba = writeQueue.First();
}
socket.BeginSend(ba.buffer, ba.readIndex, ba.Length, SocketFlags.None, SendCallBack, socket);
}
public void SendCallBack(IAsyncResult ar)
{
Socket socket = (Socket)ar.AsyncState;
// 成功发送的字节
int count = socket.EndSend(ar);
// 取出写入队列的第一个数据
ByteArray ba = writeQueue.First();
ba.readIndex += count;
if(ba.Length == 0)
{
lock(writeQueue)
{
writeQueue.Dequeue();
ba = writeQueue.First();
}
}
if (ba != null)
{
socket.BeginSend(ba.buffer, ba.readIndex, ba.Length, SocketFlags.None, SendCallBack, socket);
}
}
因为要记录下当前需要发送的数据的发送位置,因此实现了一个类ByteArray,记录需要发送的数据,数据长度,数据尾指针writeindex和当前发送的指针readindex。采用发送队列的形式,每次发送都是发送一条完整的数据,避免造成混乱。
2.3 缓冲区扩容
如果缓冲区剩余的空间不足(或剩下的空间很少),便需要对缓冲区进行扩容。扩容的操作和C++中Vector的扩容操作类似,申请新的空间,将为处理的数据拷贝过去。
if(recvBuff.Remain < 16)
{
// 扩容之前为何要先移动缓冲区,感觉没有必要,因为在resize的时候已经是在新buffer中从0开始了
recvBuff.MoveBytes();
recvBuff.ReSize(recvBuff.Length * 2);
}
原书中在扩容之前先将数据移动到开头。但是从其源码的功能来看感觉并不需要这部操作。
下面是ByteArray的代码:
using System;
public class ByteArray
{
const int default_size = 1024;
// 初始缓冲区大小
int initSize = 0;
// 缓冲区
public byte[] buffer;
// 已发送的索引
public int readIndex = 0;
// 整个数据的长度
public int writeIndex = 0;
// 缓冲区容量
public int capacity = 0;
// 缓冲区剩余空间
public int Remain { get { return capacity - writeIndex; } }
// 缓冲区中有效数据长度
public int Length { get { return writeIndex - readIndex; } }
public ByteArray(int size = default_size)
{
buffer = new byte[size];
capacity = size;
initSize = size;
readIndex = 0;
writeIndex = 0;
}
public ByteArray(byte[] defaultBytes)
{
buffer = defaultBytes;
readIndex = 0;
writeIndex = defaultBytes.Length;
capacity = defaultBytes.Length;
initSize = defaultBytes.Length;
}
// 扩容
public void ReSize(int size)
{
if (size < Length || size < initSize)
return;
// 指数扩容:从2的倍数中找一个比原来大的
int n = 1;
while (n < size)
n *= 2;
// 申请新数组并拷贝数据
capacity = n;
byte[] newbuffer = new byte[capacity];
Array.Copy(buffer, readIndex, newbuffer, 0, Length);
// buffer指向newbuffer
buffer = newbuffer;
writeIndex = Length;
readIndex = 0;
}
// 向缓冲区写数据, count是要写入缓冲区的字节数
public int Write(byte[] bs, int offset, int count)
{
if (Remain < count)
ReSize(Length + count);
Array.Copy(bs, offset, buffer, writeIndex, count);
writeIndex += count;
return count;
}
// 读取缓冲区中的数据 将缓冲区中的count字节数据读取到bs中
public int Read(byte[] bs, int offset, int count)
{
count = Math.Min(count, Length);
Array.Copy(buffer, readIndex, bs, offset, count);
readIndex += count;
CheckAndMoveBytes();
return count;
}
//检查并移动数据
public void CheckAndMoveBytes()
{
if (Length < 128)
{
MoveBytes();
}
}
//移动数据
public void MoveBytes()
{
Array.Copy(buffer, readIndex, buffer, 0, Length);
writeIndex = Length;
readIndex = 0;
}
public short ReadInt16()
{
if (Length < 2)
return 0;
short ret = BitConverter.ToInt16(buffer, readIndex);
readIndex += 2;
CheckAndMoveBytes();
return ret;
}
public int ReadInt32()
{
if (Length < 4)
return 0;
int ret = BitConverter.ToInt32(buffer, readIndex);
readIndex += 4;
CheckAndMoveBytes();
return ret;
}
//打印缓冲区
public override string ToString()
{
return BitConverter.ToString(buffer, readIndex, Length);
}
//打印调试信息
public string Debug()
{
return string.Format("readIdx({0}) writeIdx({1}) bytes({2})",
readIndex,
writeIndex,
BitConverter.ToString(buffer, 0, capacity)
);
}
}
客户端完整代码:
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Linq;
using System;
using System.Net.Sockets;
public class Echo : MonoBehaviour
{
Socket socket;
ByteArray recvBuff = new ByteArray();
int headerSize = 4;
bool startSendMsg = false;
// 发送队列
Queue<ByteArray> writeQueue = new Queue<ByteArray>();
private void Start()
{
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect("127.0.0.1", 8888);
socket.BeginReceive(recvBuff.buffer, recvBuff.writeIndex,
recvBuff.Remain, SocketFlags.None, ReceiveCallBack, socket);
}
void ReceiveCallBack(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
// 接收到的数据长度
int count = socket.EndReceive(ar);
recvBuff.writeIndex += count;
// 处理消息
HandleReceiveData();
// 继续接收数据
if(recvBuff.Remain < 16)
{
// 扩容之前为何要先移动缓冲区,感觉没有必要,因为在resize的时候已经是在新buffer中从0开始了
recvBuff.MoveBytes();
recvBuff.ReSize(recvBuff.Length * 2);
}
socket.BeginReceive(recvBuff.buffer, recvBuff.writeIndex,
recvBuff.Remain, SocketFlags.None, ReceiveCallBack, socket);
}
catch (SocketException ex)
{
Debug.Log("socket receive failed" + ex.ToString());
}
}
void HandleReceiveData()
{
Debug.Log("接收缓冲区字节数 =" + recvBuff.Length);
// 检查数据长度是否超过包头
if (recvBuff.Length < headerSize)
return;
// 读取包头长度 -- 从缓冲区的开头读取4字节, 两种写法都可以
int bodyLength = BitConverter.ToInt32(recvBuff.buffer, recvBuff.readIndex);
//int readInx = recvBuff.readIndex;
//byte[] bytes = recvBuff.buffer;
//int bodyLength = (bytes[readInx + 3] << 24) | bytes[readInx + 2] << 16 |
// bytes[readInx + 1] << 8 | bytes[readInx];
// 检查是否足够一条信息
if (recvBuff.Length < headerSize + bodyLength)
return;
// 移动readindex一个包头的字节数
recvBuff.readIndex += headerSize;
// 拷贝一条数据到临时byte数组中
byte[] tmpBytes = new byte[bodyLength];
recvBuff.Read(tmpBytes, 0, bodyLength);
// 处理一条消息
string data = System.Text.Encoding.UTF8.GetString(tmpBytes);
Debug.Log("receive data = " + data);
Debug.Log("capacity = " + recvBuff.capacity);
// 继续处理消息
HandleReceiveData();
}
// 将发送数据写入发送队列
public void Send()
{
// 发送数据
string sendStr= "hello Python, hello Python, hello Python, hello Python, hello Python, hello Python";
// 组装消息
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
short length = (short)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(length);
// 统一为小端编码
if(!BitConverter.IsLittleEndian)
{
lenBytes.Reverse();
}
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
// 发送钱保存数据
ByteArray ba = new ByteArray(sendBytes);
// 加锁,防止线程冲突
lock(writeQueue)
{
writeQueue.Enqueue(ba);
}
// 发送数据
DoSend();
}
// 读取发送队列的第一条数据,发送数据
public void DoSend()
{
lock(writeQueue)
{
if (writeQueue.Count <= 0)
return;
}
// 读取队列的第一条数据
ByteArray ba;
lock (writeQueue)
{
ba = writeQueue.First();
}
socket.BeginSend(ba.buffer, ba.readIndex, ba.Length, SocketFlags.None, SendCallBack, socket);
}
public void SendCallBack(IAsyncResult ar)
{
Socket socket = (Socket)ar.AsyncState;
// 成功发送的字节
int count = socket.EndSend(ar);
// 取出写入队列的第一个数据
ByteArray ba = writeQueue.First();
ba.readIndex += count;
if(ba.Length == 0)
{
lock(writeQueue)
{
writeQueue.Dequeue();
ba = writeQueue.First();
}
}
if (ba != null)
{
socket.BeginSend(ba.buffer, ba.readIndex, ba.Length, SocketFlags.None, SendCallBack, socket);
}
}
public void OnButtonStartClick()
{
startSendMsg = true;
}
private void Update()
{
if (!startSendMsg)
return;
Send();
}
}
3、服务端
服务端的代码与上一节没什么差别。
# Handle The TCP problem
import socket
import time
import struct
def parse_msg_length(head):
# get message body length
length = 0
for i in range(len(head)):
length += ord(head[i]) * 256 ** i
return length
def handle_message(read_buffer):
# check buffer > head size
buffer_count = len(read_buffer)
if buffer_count < head_size:
return False, 0
# check buffer > one whole data
msg_body_size = parse_msg_length(read_buffer[0:head_size])
if buffer_count < head_size + msg_body_size:
return False, 0
# handle message
data = read_buffer[head_size:msg_body_size+head_size]
print 'data:', data
return True, msg_body_size
# server socket
ip = "127.0.0.1"
port = 8888
server_address = (ip, port)
buffer_size = 1024
head_size = 2
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(server_address)
server.listen(1)
read_buffer = ""
print "Waiting for Connection."
while True:
connection, client_address = server.accept()
print "client connected: ", client_address
while True:
try:
msg_body = "Hello Unity Hello Unity Hello Unity Hello Unity"
pack_format = 'I' + str(len(msg_body)) + 's'
send_data = (len(msg_body), msg_body)
send_bytes = struct.pack(pack_format, *send_data)
connection.send(send_bytes)
# time.sleep(1)
receive_data = connection.recv(buffer_size)
if receive_data:
read_buffer += receive_data
result, length = handle_message(read_buffer)
if result:
read_buffer = read_buffer[head_size + length:]
else:
pass
print 'read_buffer: ', read_buffer
else:
connection.close()
break
except socket.error:
connection.close()
4、效果
将服务端代码放在云运行,本地运行客户端:
在服务端中缓冲区read_buffer中的数据很多,但是可以正常解析出数据包。服务器的接收和发送都没有异常。
将服务端的接收消息注释掉,服务端不停给客户端发送数据:
可以看到,客户端的接收处理数据都正常。当缓冲区数据有大于一条数据便一直处理,直到剩余数据不足一条。