平时工作中难免会需要编写一些涉及到通讯方面的代码。在通讯方面也有许多优秀的开源项目,比如大名鼎鼎的Netty,Mina等。但是有一种情况是,如果,自己的使用场景比较小,这时候使用这些大型的开源项目,会显得大材小用,造成性能浪费不说,学习成本也是一件需要考虑的事。这时候如果有一个轻量级的,健壮的通讯框架,就显得特别好用。笔者工作这一年多,基本都是在与一些简单自定义的通讯框架打交道。恰巧,最近有时间,总结下,学习的心得。
在开始本文前,先来看看几个问题。
1.一个完整的通讯框架应该有哪几部分组成?
2.怎么样保证数据的完整性(处理分包,拼包问题)?
3.怎么样自定义协议,自定义应该要遵循那些规则?
别急, 我们一个一个的来看。
对于问题1
笔者的看法是,一个完整的通信协议至少应该包含两个部分,1>数据模型部分,主要处理数据的序列化与反序列化问题。2>通信部分,主要用于发送与接受序列化过的数据。
对于问题2
在通信模块接收到序列化过的数据后,需要讲这些二进制的数据进行反序列化。难点就是这个将数据反序列化的过程。面对一大堆接受到的二进制的数据,你根本就不知道,一个完整的数据包是从那里开始,从哪里结束。要解决这个问题,至少得包含两个方面的信息,1)一个唯一标识的包头,即包开始的地方。2)整包数据的长度,即包结束的地方。这里要提醒的是,如果别人破解了你的协议,要主意防范Socket攻击。即在在一个合法的包头后面,放置一个足够大的包的长度,造成程序内存溢出,存而使程序崩溃。
对于问题3
笔者认为,一个设计良好的自定义协议至少应该包含两个接口,一个进行包数据的序列化,一个进行包数据的反序列化,即readFromBytes 与 writeToBytes,即数据对象与java对象之间的转化。
下面先看看整个工程的目录结构:
这些基本问题解决了,写起来就会轻松很多。废话不多说,上代码。上面提到,一个轻量级的通信框架至少应该包含两个部分,数据模型部分与通信部分。本文先讲简单的,从数据模型部分开始。下面是数据模块的基类Packet。
/**
* +--------------------------------------------------------------------------------------------+
* | 包头(2字节)| 时间戳(8字节)| 有效数据长度(4字节)| 有效数据 |
* +--------------------------------------------------------------------------------------------+
*/
public class Packet {
//包头,标识一个数据包的开始
private short HEAD = -256;
//时间戳,标注数据包生成时的时间
private long timeStamp = System.currentTimeMillis();
//包中有效数据的长度,因此,一个完整包的长度 = 2 + 8 + 4 + length;
private int length;
//包中Id,用于区别Packet的类型
private long id;
public Packet(long id) {
this.id = id;
}
// 序列化方法,将一个Java对象转换为数据对象
public byte[] getBytes() {
JsonObject jo = writeToJson();
byte[] content = jo.toString().getBytes();
length = content.length;
byte []bytes = new byte[2+8+4+length];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.putShort(HEAD);
buffer.putLong(timeStamp);
buffer.putInt(length);
buffer.put(content);
return buffer.array();
}
public void readFromBytes(byte[] bytes) {
readFromBytes(bytes, 0, bytes.length);
}
// 反序列化方法,将一个数据对象转换为Java对象
public void readFromBytes(byte[] bytes, int offset, int length) {
String msg = new String(bytes, offset, length);
JsonObject jo = (new JsonParser()).parse(msg).getAsJsonObject();
readFromJson(jo);
}
public JsonObject writeToJson() {
JsonObject jo = new JsonObject();
jo.addProperty("id", id);
return jo;
}
public void readFromJson(JsonObject jo) {
this.id = jo.get("id").getAsLong();
}
public Long getId() {
return id;
}
}
Packet 的有效数据部分是一个Json字符串,这里引用了Google的Gson。为啥数据的有效部分要用Json字符串呢?这里是为了可读性。子类只需要复写writeToJson,如下:
@Override
public JsonObject writeToJson() {
JsonObject jo = super.writeToJson();
jo.addProperty("heart_beat", "yes");
return jo;
}
就能将要传输的任何数据放进包中。
与此同时,还需要改写readFromJson方法,获取Json中传输的数据,组装一个Java对象,如下:
@Override
public void readFromJson(JsonObject jo) {
super.readFromJson(jo);
heartbeat = jo.get("heart_beat").getAsString();
}
数据模型部分就说到这里,理解了原理,其实还是非常简单的。
下面讲述本篇博文的重中之重,数据通讯的部分。先别急看代码,画个图简单的说明下设计思想:
Session:主要面向业务,比如发送数据包,接收数据包,分发数据包
Connection:主要面向连接,是发送数据,接受数据的具体实现类
这样设计的好处是,将业务与底层连接分离开。Session 只需要处理各种业务,比如登陆,注册,维护心跳等。Session并不需要关心Connection是否损坏,也不需要自己维护Connection。Connection就专注于发送数据,至于发送的是啥,并不需要关心。
下面,看Connection的实现:
public class Connection {
private Socket mSocket;
private SocketAddress mAddress;
private STATUS mStatus;
//接受数据的缓存队列
private BlockingQueue<Packet> mReceiverQueue = new LinkedBlockingQueue<>();
//发送数据的缓存队列
private BlockingQueue<Packet> mSenderQueue = new LinkedBlockingQueue<>();
//Socket 读数据流
private InputStream inputStream;
//Socket 写数据流
private OutputStream outputStream;
//分发接收到的数据包的监听器
private OnPacketReceivedListener mListener;
private boolean mStop;
//
public enum STATUS{
CONNECTED,
DISCONNECTED,
CONNECTING;
}
public boolean connect(SocketAddress address, int timeout) {
if(address == null) {
return false;
}
if(mSocket != null) {
close();
setStatus(STATUS.DISCONNECTED);
}
Socket socket = new Socket();
boolean success = true;
try {
socket.setSoTimeout(10000);
setStatus(STATUS.CONNECTING);
socket.setReuseAddress(true);
socket.connect(address, timeout);
setStatus(STATUS.CONNECTED);
setSocket(socket);
} catch (IOException e) {
System.out.println("connect failed, message: "+e.getMessage());
success = false;
}finally {
try {
if(socket!=null) {
socket.close();
}
} catch (IOException e) {
}
}
return success;
}
public boolean connect(SocketAddress address) {
return connect(address, Integer.MAX_VALUE);
}
public boolean setSocket(Socket socket) {
if(mSocket!=null) {
close();
setStatus(STATUS.DISCONNECTED);
}
if(!socket.isConnected()) {
return false;
}
try {
mSocket = socket;
socket.setSoTimeout(30000);
inputStream = mSocket.getInputStream();
outputStream = mSocket.getOutputStream();
mStop = false;
setStatus(STATUS.CONNECTED);
startSender();
startReader();
} catch (IOException e) {
setStatus(STATUS.DISCONNECTED);
return false;
}
return true;
}
public void setOnPacketListener(OnPacketReceivedListener l) {
this.mListener = l;
}
private void startReader() {
new Thread(new Runnable() {
@Override
public void run() {
byte buffer[] = new byte[512*1024];
int dataLen = 0;
while(!mStop) {
try {
if(inputStream.available() <= 0) {
sleep(100);
continue;
}
int read = inputStream.read(buffer, dataLen, buffer.length-dataLen);
dataLen += read;
int handled = parsePackets(buffer, dataLen);
dataLen = dataLen - handled;
System.arraycopy(buffer, handled, buffer, 0, dataLen);
dispatchPackets();
if(dataLen == buffer.length) {
dataLen = 0;
}
} catch (IOException e) {
} catch (InterruptedException e) {
}
}
}
}).start();
}
protected void dispatchPackets() throws InterruptedException {
if(mSenderQueue.isEmpty()) {
return;
}
//dispatch message
Packet p = null;
try {
p = mReceiverQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (Exception e) {
}
if(mListener != null
&& p != null) {
mListener.onPacketReceived(p);
}
}
//这个方法是分包的重点
private int parsePackets(byte[] buffer, int length) {
int handled = 0;
int i = 0;
ByteBuffer buf = ByteBuffer.wrap(buffer, 0, length);
while( i < length) {
//总长度小于14,即有效数据前面部分,直接pass掉
if(i + 14 < length) {
return handled;
}
//不是以包头标识符开始的,移动指针到下一位
if(buf.getShort(i) != -256) {
i++;
continue;
}
//获取有效数据部分的长度
int len = buf.getInt(i+10);
//2+8+4+length > 剩余字节,则说明,这一包数据不完整,直接pass
if(i + 14 + len < length) {
return handled;
}
//通过包id,可以知道包的类型,进行包的反序列化,并且扔进接收的缓存队列
byte[] packet = new byte[14+len];
System.arraycopy(buffer, i, packet, 0, packet.length);
String joStr = new String(packet, 14, packet.length-14);
JsonObject jo = (new JsonParser()).parse(joStr).getAsJsonObject();
long packetId = jo.get("id").getAsLong();
if(packetId > 0) {
Packet p = PacketFactory.createPacket(packetId);
if(p != null) {
p.readFromJson(jo);
mReceiverQueue.offer(p);
}
}
handled = (i+14+len);
i = handled;
}
return handled;
}
private void startSender() {
new Thread(new Runnable() {
@Override
public void run() {
while(!mStop) {
if(mSenderQueue.isEmpty()) {
sleep(100);
continue;
}
try {
Packet packet = mSenderQueue.take();
byte[] msg = packet.getBytes();
outputStream.write(msg);
} catch (InterruptedException e) {
} catch (IOException e) {
}
}
}
}).start();
}
public boolean delivery(Packet packet) {
return mSenderQueue.offer(packet);
}
public boolean isConnected() {
return getStatus() == STATUS.CONNECTED
&& mSocket != null
&& mSocket.isConnected();
}
public static void closeQuietly(Socket socket) {
if(socket!=null) {
try {
socket.close();
} catch (IOException e) {
System.out.println("close socket failed, message: "+e.getMessage());
}
}
}
public void close() {
closeQuietly(mSocket);
mSocket = null;
mReceiverQueue.clear();
mSenderQueue.clear();
mStop = true;
}
public SocketAddress getAddress() {
return mAddress;
}
public void setStatus(STATUS status) {
this.mStatus = status;
}
public STATUS getStatus() {
return mStatus;
}
public static void sleep(long timeInMills) {
try {
Thread.sleep(timeInMills);
} catch (InterruptedException e) {
}
}
public interface OnPacketReceivedListener{
void onPacketReceived(Packet packet);
}
}
Connection中开启了两个线程进行数据的读/写。最复杂的部分就是分包部分。Read the fucking code!这里我也不多讲了。没什么技巧,纯看编程的基本功!
Session的实现:
public class Session {
private long clientPacketCount = 0;
private long serverPacketCount = 0;
private long startDate = System.currentTimeMillis();
private long lastActiveDate;
private String mSessionId;
private Connection mConnection;
private PacketDispatcher mDispatcher = new PacketDispatcher();
private OnPacketReceivedListener mPacketListenner = new OnPacketReceivedListener() {
@Override
public void onPacketReceived(Packet packet) {
mDispatcher.notifyPacketReceived(packet);
incrementServerPacketCount();
}
};
public Session(String sessionId, Connection conn) {
this.mSessionId = sessionId;
this.mConnection = conn;
this.mConnection.setOnPacketListener(mPacketListenner);
}
public void updateConnection(Connection conn) {
if(mConnection != null) {
mConnection.close();
}
mConnection = conn;
mConnection.setOnPacketListener(mPacketListenner);
}
public String getSessionId() {
return mSessionId;
}
public boolean isConnected() {
return mConnection!=null && mConnection.isConnected();
}
public STATUS getStatus() {
return mConnection == null
? STATUS.DISCONNECTED : mConnection.getStatus();
}
public Connection getConnection() {
return mConnection;
}
public long getNumClientPackets() {
return clientPacketCount;
}
public long getNumServerPackets() {
return serverPacketCount;
}
public void deliver(Packet packet) {
if (mConnection != null) {
mConnection.delivery(packet);
incrementClientPacketCount();
}
}
public void close() {
if (mConnection != null) {
mConnection.close();
}
}
public boolean isClosed() {
return mConnection.isConnected();
}
public Date getCreationDate() {
return new Date(startDate);
}
public Date getLastActiveDate() {
return new Date(lastActiveDate);
}
public void incrementClientPacketCount() {
clientPacketCount++;
lastActiveDate = System.currentTimeMillis();
}
public void incrementServerPacketCount() {
serverPacketCount++;
lastActiveDate = System.currentTimeMillis();
}
public void registerObserver(long packetId, Observer ob) {
mDispatcher.registerObserver(packetId, ob);
}
public void unregisterObserver(long packetId) {
mDispatcher.unregisterObserver(packetId);
}
public void registerAll() {
mDispatcher.unregisterAll();
}
private class PacketDispatcher{
private HashMap<Long, Observable> mObservables = new HashMap<>();
public void notifyPacketReceived(Packet packet) {
if(packet == null) {
return;
}
Observable observable = mObservables.get(packet.getId());
if(observable==null) {
System.out.println("Havn't registered this type observable.");
}
if(observable != null) {
try {
Field changed = observable.getClass().getDeclaredField("changed");
if(!changed.isAccessible()) {
changed.setAccessible(true);
}
changed.set(observable, true);
} catch (NoSuchFieldException e) {
} catch (SecurityException e) {
} catch (IllegalArgumentException e) {
} catch (IllegalAccessException e) {
}
observable.notifyObservers(packet);
}
}
public void registerObserver(Long packetId, Observer ob) {
Observable observable = mObservables.get(packetId);
if(observable==null) {
observable = new Observable();
mObservables.put(packetId, observable);
}
observable.addObserver(ob);
}
public void unregisterObserver(Long packetId) {
if(!mObservables.containsKey(packetId)) {
return;
}
Observable observable = mObservables.get(packetId);
observable.deleteObservers();
mObservables.remove(packetId);
}
public void unregisterAll() {
if(!mObservables.isEmpty()) {
for(Observable o : mObservables.values()) {
o.deleteObservers();
}
mObservables.clear();
}
}
}
}
Session中的实现就比较简单了,主要是处理包的分发。这里通过自定义的PacketDispatcher分发数据包,写得非常简单。
至此,一个基本的数据通信框架就搭成了。时间不早了,今天就写道这里,明天继续。