一个基于传统IO的轻量级的通讯框架

平时工作中难免会需要编写一些涉及到通讯方面的代码。在通讯方面也有许多优秀的开源项目,比如大名鼎鼎的Netty,Mina等。但是有一种情况是,如果,自己的使用场景比较小,这时候使用这些大型的开源项目,会显得大材小用,造成性能浪费不说,学习成本也是一件需要考虑的事。这时候如果有一个轻量级的,健壮的通讯框架,就显得特别好用。笔者工作这一年多,基本都是在与一些简单自定义的通讯框架打交道。恰巧,最近有时间,总结下,学习的心得。

在开始本文前,先来看看几个问题。

1.一个完整的通讯框架应该有哪几部分组成?
2.怎么样保证数据的完整性(处理分包,拼包问题)?
3.怎么样自定义协议,自定义应该要遵循那些规则?

别急, 我们一个一个的来看。

对于问题1

笔者的看法是,一个完整的通信协议至少应该包含两个部分,1>数据模型部分,主要处理数据的序列化与反序列化问题。2>通信部分,主要用于发送与接受序列化过的数据。

对于问题2

在通信模块接收到序列化过的数据后,需要讲这些二进制的数据进行反序列化。难点就是这个将数据反序列化的过程。面对一大堆接受到的二进制的数据,你根本就不知道,一个完整的数据包是从那里开始,从哪里结束。要解决这个问题,至少得包含两个方面的信息,1)一个唯一标识的包头,即包开始的地方。2)整包数据的长度,即包结束的地方。这里要提醒的是,如果别人破解了你的协议,要主意防范Socket攻击。即在在一个合法的包头后面,放置一个足够大的包的长度,造成程序内存溢出,存而使程序崩溃。

一个简单的通信框架模型
对于问题3

笔者认为,一个设计良好的自定义协议至少应该包含两个接口,一个进行包数据的序列化,一个进行包数据的反序列化,即readFromBytes 与 writeToBytes,即数据对象与java对象之间的转化。

下面先看看整个工程的目录结构:

目录结构

这些基本问题解决了,写起来就会轻松很多。废话不多说,上代码。上面提到,一个轻量级的通信框架至少应该包含两个部分,数据模型部分与通信部分。本文先讲简单的,从数据模型部分开始。下面是数据模块的基类Packet。


/**
*   +--------------------------------------------------------------------------------------------+
*    | 包头(2字节)| 时间戳(8字节)| 有效数据长度(4字节)| 有效数据 |
*   +--------------------------------------------------------------------------------------------+
*/

public class Packet {
    
//包头,标识一个数据包的开始
    private short HEAD = -256;
//时间戳,标注数据包生成时的时间
    private long timeStamp = System.currentTimeMillis();
//包中有效数据的长度,因此,一个完整包的长度 = 2 + 8 + 4 + length;
    private int length;
//包中Id,用于区别Packet的类型
    private long id;
    
    public Packet(long id) {
        this.id = id;
    }
    
 // 序列化方法,将一个Java对象转换为数据对象
    public byte[] getBytes() {
        
        JsonObject jo = writeToJson();
        byte[] content = jo.toString().getBytes();
        
        length = content.length;
        
        byte []bytes = new byte[2+8+4+length];
        
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        
        buffer.putShort(HEAD);
        buffer.putLong(timeStamp);
        buffer.putInt(length);
        buffer.put(content);
        
        return buffer.array();
    }
    
    public void readFromBytes(byte[] bytes) {
        readFromBytes(bytes, 0, bytes.length);
    }
    
 // 反序列化方法,将一个数据对象转换为Java对象
    public void readFromBytes(byte[] bytes, int offset, int length) {
        String msg = new String(bytes, offset, length);
        JsonObject jo = (new JsonParser()).parse(msg).getAsJsonObject();
        readFromJson(jo);
    }
    
    public JsonObject writeToJson() {
        JsonObject jo = new JsonObject();
        jo.addProperty("id", id);
        return jo;
    }
    
    public void readFromJson(JsonObject jo) {
        this.id = jo.get("id").getAsLong();
    }
    
    public Long getId() {
        return id;
    }
    
}

Packet 的有效数据部分是一个Json字符串,这里引用了Google的Gson。为啥数据的有效部分要用Json字符串呢?这里是为了可读性。子类只需要复写writeToJson,如下:

@Override
    public JsonObject writeToJson() {
        JsonObject jo = super.writeToJson();
        jo.addProperty("heart_beat", "yes");
        return jo;
    }

就能将要传输的任何数据放进包中。

与此同时,还需要改写readFromJson方法,获取Json中传输的数据,组装一个Java对象,如下:

@Override
    public void readFromJson(JsonObject jo) {
        super.readFromJson(jo);
        heartbeat = jo.get("heart_beat").getAsString();
    }

数据模型部分就说到这里,理解了原理,其实还是非常简单的。

下面讲述本篇博文的重中之重,数据通讯的部分。先别急看代码,画个图简单的说明下设计思想:

通信模块

Session:主要面向业务,比如发送数据包,接收数据包,分发数据包

Connection:主要面向连接,是发送数据,接受数据的具体实现类

这样设计的好处是,将业务与底层连接分离开。Session 只需要处理各种业务,比如登陆,注册,维护心跳等。Session并不需要关心Connection是否损坏,也不需要自己维护Connection。Connection就专注于发送数据,至于发送的是啥,并不需要关心。

下面,看Connection的实现:


public class Connection {
    
    private Socket mSocket;
    private SocketAddress mAddress;
    private STATUS mStatus;
    
//接受数据的缓存队列
    private BlockingQueue<Packet> mReceiverQueue = new LinkedBlockingQueue<>();
//发送数据的缓存队列
    private BlockingQueue<Packet> mSenderQueue = new LinkedBlockingQueue<>();
//Socket 读数据流
    private InputStream inputStream;
//Socket 写数据流
    private OutputStream outputStream;
//分发接收到的数据包的监听器
    private OnPacketReceivedListener mListener;
    
    private boolean mStop;
    
//
    public enum STATUS{
        CONNECTED,
        DISCONNECTED,
        CONNECTING;
    }
    
    public boolean connect(SocketAddress address, int timeout) {
        
        if(address == null) {
            return false;
        }
        
        if(mSocket != null) {
            close();
            setStatus(STATUS.DISCONNECTED);
        }
        
        Socket socket = new Socket();
        boolean success = true;
        try {
            socket.setSoTimeout(10000);
            setStatus(STATUS.CONNECTING);
            socket.setReuseAddress(true);
            socket.connect(address, timeout);
            setStatus(STATUS.CONNECTED);
            setSocket(socket);
        } catch (IOException e) {
            System.out.println("connect failed, message: "+e.getMessage());
            success = false;
        }finally {
            try {
                if(socket!=null) {
                    socket.close();
                }
            } catch (IOException e) {
            }
        }
        return success;
        
    }
    
    public boolean connect(SocketAddress address) {
        return connect(address, Integer.MAX_VALUE);
    }
    
    public boolean setSocket(Socket socket) {
        
        if(mSocket!=null) {
            close();
            setStatus(STATUS.DISCONNECTED);
        }
        
        if(!socket.isConnected()) {
            return false;
        }
        
        try {
            mSocket = socket;
            socket.setSoTimeout(30000);
            inputStream = mSocket.getInputStream();
            outputStream = mSocket.getOutputStream();
            
            mStop = false;
            
            setStatus(STATUS.CONNECTED);
            startSender();
            startReader();
        } catch (IOException e) {
            setStatus(STATUS.DISCONNECTED);
            return false;
        }
        
        return true;
    }
    
    public void setOnPacketListener(OnPacketReceivedListener l) {
        this.mListener = l;
    }
    
    private void startReader() {
        
        new Thread(new Runnable() {
            
            @Override
            public void run() {
                
                byte buffer[] = new byte[512*1024];
                int dataLen = 0;
                
                while(!mStop) {
                    
                    try {
                        if(inputStream.available() <= 0) {
                            sleep(100);
                            continue;
                        }
                        
                        int read = inputStream.read(buffer, dataLen, buffer.length-dataLen);
                        
                        dataLen += read;
                        
                        int handled = parsePackets(buffer, dataLen);
                        
                        dataLen = dataLen - handled;
                        System.arraycopy(buffer, handled, buffer, 0, dataLen);
                        
                        dispatchPackets();
                        
                        if(dataLen == buffer.length) {
                            dataLen = 0;
                        }
                        
                        
                    } catch (IOException e) {
                    } catch (InterruptedException e) {
                    }
                    
                }
                
            }

        }).start();
        
    }
    
    protected void dispatchPackets() throws InterruptedException {
        
        if(mSenderQueue.isEmpty()) {
            return;
        }
        //dispatch message
        Packet p = null;
        try {
            p = mReceiverQueue.poll(100, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
        }
        
        if(mListener != null 
                && p != null) {
            mListener.onPacketReceived(p);
        }
        
    }
//这个方法是分包的重点
    private int parsePackets(byte[] buffer, int length) {
        int handled = 0;
        int i = 0;
        
        ByteBuffer buf = ByteBuffer.wrap(buffer, 0, length);
        while( i < length) {
//总长度小于14,即有效数据前面部分,直接pass掉                  
            if(i + 14 < length) {
                return handled;
            }
//不是以包头标识符开始的,移动指针到下一位
            if(buf.getShort(i) != -256) {
                i++;
                continue;
            }
//获取有效数据部分的长度
            int len = buf.getInt(i+10);
//2+8+4+length > 剩余字节,则说明,这一包数据不完整,直接pass
            if(i + 14 + len < length) {
                return handled;
            }
//通过包id,可以知道包的类型,进行包的反序列化,并且扔进接收的缓存队列
            byte[] packet = new byte[14+len];
            System.arraycopy(buffer, i, packet, 0, packet.length);
            String joStr = new String(packet, 14, packet.length-14);
            JsonObject jo = (new JsonParser()).parse(joStr).getAsJsonObject();
            long packetId = jo.get("id").getAsLong();
            if(packetId > 0) {
                Packet p = PacketFactory.createPacket(packetId);
                if(p != null) {
                    p.readFromJson(jo);
                    mReceiverQueue.offer(p);
                }
            }
                    handled = (i+14+len);
            i = handled;
        }
        return handled;
    }

    private void startSender() {
        
        new Thread(new Runnable() {
            
            @Override
            public void run() {
                
                while(!mStop) {
                    
                    if(mSenderQueue.isEmpty()) {
                        sleep(100);
                        continue;
                    }
                    
                    try {
                        Packet packet = mSenderQueue.take();
                        byte[] msg = packet.getBytes();
                        outputStream.write(msg);
                    } catch (InterruptedException e) {
                    } catch (IOException e) {
                    }
                    
                }
                
            }
        }).start();
        
    }
    
    public boolean delivery(Packet packet) {
        
        return mSenderQueue.offer(packet);
    }

    public boolean isConnected() {
        return getStatus() == STATUS.CONNECTED
                  && mSocket != null
                    && mSocket.isConnected();
    }
    
    public static void closeQuietly(Socket socket) {
        if(socket!=null) {
            try {
                socket.close();
            } catch (IOException e) {
                System.out.println("close socket failed, message: "+e.getMessage());
            }
        }
    }
    
    public void close() {
        closeQuietly(mSocket);
        mSocket = null;
        
        mReceiverQueue.clear();
        mSenderQueue.clear();
        mStop = true;
    }
    
    public SocketAddress getAddress() {
        return mAddress;
    }
    
    public void setStatus(STATUS status) {
        this.mStatus = status;
    }
    
    public STATUS getStatus() {
        return mStatus;
    }
    
    public static void sleep(long timeInMills) {
        try {
            Thread.sleep(timeInMills);
        } catch (InterruptedException e) {
        }
    }
    
    public interface OnPacketReceivedListener{
        void onPacketReceived(Packet packet);
    }

}

Connection中开启了两个线程进行数据的读/写。最复杂的部分就是分包部分。Read the fucking code!这里我也不多讲了。没什么技巧,纯看编程的基本功!

Session的实现:

public class Session {
    
    private long clientPacketCount = 0;
    private long serverPacketCount = 0;
    private long startDate = System.currentTimeMillis();
    private long lastActiveDate;
    
    private String mSessionId;
    
    private Connection mConnection;
    
    private PacketDispatcher mDispatcher = new PacketDispatcher();
    
    private OnPacketReceivedListener mPacketListenner = new OnPacketReceivedListener() {
        
        @Override
        public void onPacketReceived(Packet packet) {
            mDispatcher.notifyPacketReceived(packet);
            incrementServerPacketCount();
        }
    };
    
    public Session(String sessionId, Connection conn) {
        this.mSessionId = sessionId;
        this.mConnection = conn;
        this.mConnection.setOnPacketListener(mPacketListenner);
    }
    
    public void updateConnection(Connection conn) {
        
        if(mConnection != null) {
            mConnection.close();
        }
        
        mConnection = conn;
        mConnection.setOnPacketListener(mPacketListenner);
        
    }
    
    public String getSessionId() {
        return mSessionId;
    }
    
    public boolean isConnected() {
        return mConnection!=null && mConnection.isConnected();
    }
    
    public STATUS getStatus() {
        return mConnection == null 
                ? STATUS.DISCONNECTED : mConnection.getStatus();
    }
    
    public Connection getConnection() {
        return mConnection;
    }
    
    public long getNumClientPackets() {
        return clientPacketCount;
    }

    public long getNumServerPackets() {
        return serverPacketCount;
    }
    
    public void deliver(Packet packet) {
        if (mConnection != null) {
            mConnection.delivery(packet);
            incrementClientPacketCount();
        }
    }
    
    public void close() {
        if (mConnection != null) {
            mConnection.close();
        }
    }
    
    public boolean isClosed() {
        return mConnection.isConnected();
    }
    
    public Date getCreationDate() {
        return new Date(startDate);
    }
    
    public Date getLastActiveDate() {
        return new Date(lastActiveDate);
    }
    
    public void incrementClientPacketCount() {
        clientPacketCount++;
        lastActiveDate = System.currentTimeMillis();
    }

    public void incrementServerPacketCount() {
        serverPacketCount++;
        lastActiveDate = System.currentTimeMillis();
    }
    
    public void registerObserver(long packetId, Observer ob) {
        mDispatcher.registerObserver(packetId, ob);
    }
    
    public void unregisterObserver(long packetId) {
        mDispatcher.unregisterObserver(packetId);
    }
    
    public void registerAll() {
        mDispatcher.unregisterAll();
    }
    
    private class PacketDispatcher{
        
        private HashMap<Long, Observable> mObservables = new HashMap<>();
        
        public void notifyPacketReceived(Packet packet) {
            
            if(packet == null) {
                return;
            }
            
            Observable observable = mObservables.get(packet.getId());
            
            if(observable==null) {
                System.out.println("Havn't registered this type observable.");
            }
            
            if(observable != null) {
                
                try {
                    Field changed = observable.getClass().getDeclaredField("changed");
                    if(!changed.isAccessible()) {
                        changed.setAccessible(true);
                    }
                    changed.set(observable, true);
                } catch (NoSuchFieldException e) {
                } catch (SecurityException e) {
                } catch (IllegalArgumentException e) {
                } catch (IllegalAccessException e) {
                }
                
                observable.notifyObservers(packet);
            }
            
        }
        
        public void registerObserver(Long packetId, Observer ob) {
            
            Observable observable = mObservables.get(packetId);
            
            if(observable==null) {
                observable = new Observable();
                mObservables.put(packetId, observable);
            }
            
            observable.addObserver(ob);
            
        }
        
        public void unregisterObserver(Long packetId) {
            if(!mObservables.containsKey(packetId)) {
                return;
            }
            
            Observable observable = mObservables.get(packetId);
            observable.deleteObservers();
            mObservables.remove(packetId);
            
        }
        
        public void unregisterAll() {
            
            if(!mObservables.isEmpty()) {
                
                for(Observable o : mObservables.values()) {
                    o.deleteObservers();
                }
                mObservables.clear();
            }
        }
        
    }

}

Session中的实现就比较简单了,主要是处理包的分发。这里通过自定义的PacketDispatcher分发数据包,写得非常简单。

至此,一个基本的数据通信框架就搭成了。时间不早了,今天就写道这里,明天继续。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容