c#Socket服务端异步高性能并发 解决 半包 粘包问题 封装类

/// <summary>
/// Socket连接,双向通信
/// </summary>
public class SocketConnection
{
    #region 构造函数

    public SocketConnection(Socket socket, SocketServer server)
    {
        _socket = socket;
        _server = server;
    }

    #endregion

    public Socket ClientSocket { get { return _socket; } }
    public bool IsCheck { get; set; }//该客户是否通过首次验证  否则无法进行后续操作
    #region 私有成员

    private readonly Socket _socket;
    private bool _isRec = true;
    private SocketServer _server = null;
    private byte[] Tmp_byteArr = new byte[0];//缓存 多余 或者 不完整 的封包数据
    private bool IsSocketConnected()
    {
        bool part1 = _socket.Poll(1000, SelectMode.SelectRead);
        bool part2 = (_socket.Available == 0);
        if (part1 && part2)
            return false;
        else
            return true;
    }

    #endregion

    #region 外部接口

    /// <summary>
    /// 开始接受客户端消息
    /// </summary>
    public void StartRecMsg()
    {
        try
        {
            byte[] container = new byte[1024 * 1024 * 2];
            _socket.BeginReceive(container, 0, container.Length, SocketFlags.None, asyncResult =>
            {
                try
                {
                    int length = _socket.EndReceive(asyncResult);

                    //马上进行下一轮接受,增加吞吐量
                    if (length > 0 && _isRec && IsSocketConnected())
                        StartRecMsg();

                    if (length > 0)
                    {
                        byte[] recBytes = new byte[length];
                        Array.Copy(container, 0, recBytes, 0, length);
                        ParMsg(recBytes);

                        ////处理消息 这里我把他取消了 放到了下面消息解析里面触发 当解析出完整数据时再触发 这样订阅事件的地方不用在做处理
                        //HandleRecMsg?.Invoke(recBytes, this, _server);
                    }
                    else
                        Close();
                }
                catch (Exception ex)
                {
                    HandleException?.Invoke(ex);
                    Close();
                }
            }, null);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
            Close();
        }
    }

    void ParMsg(byte[] arr)
    {
        List<byte> result = new List<byte>();//之后所有的操作数据都保存在这  list方便点
        if (Tmp_byteArr.Length > 0)//判断之前是否有保存多余数据
        {
            result.AddRange(Tmp_byteArr);
        }
        result.AddRange(arr);
        if (result.Count <= 4)//前4个字节代表包长 如果不够则表示包不完整 保存起来
        {
            Tmp_byteArr = result.ToArray();
            return;
        }
        int DataLength = BitConverter.ToInt32(Common.SubByte(result.ToArray(), 0, 4), 0);//包长 但不含这个包长本身
        if (DataLength == result.Count - 4)//如果相等 则刚好是一个完整的包
        {
            MsgBody msg = new MsgBody(result.ToArray());
            //处理消息 
            HandleRecMsg?.Invoke(msg, this, _server);
            return;
        }
        if (DataLength > result.Count - 4)//如果包长 大于实际消息长度 说明包不完整 则保存至下次使用
        {
            Tmp_byteArr = result.ToArray();
            return;
        }
        if (DataLength < result.Count - 4)//如果包长 小于实际消息长度 说明存在了粘包 要把多余的内容递归使用
        {
            MsgBody msg = new MsgBody(result.Take(DataLength).ToArray());
            //处理消息 
            HandleRecMsg?.Invoke(msg, this, _server);
            ParMsg(result.Skip(DataLength).ToArray());
        }

    }

    /// <summary>
    /// 发送数据
    /// </summary>
    /// <param name="bytes">数据字节</param>
    public void Send(byte[] bytes)
    {
        try
        {
            _socket.BeginSend(bytes, 0, bytes.Length, SocketFlags.None, asyncResult =>
            {
                try
                {
                    int length = _socket.EndSend(asyncResult);
                    HandleSendMsg?.Invoke(bytes, this, _server);
                }
                catch (Exception ex)
                {
                    HandleException?.Invoke(ex);
                }
            }, null);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    /// <summary>
    /// 发送字符串(默认使用UTF-8编码)
    /// </summary>
    /// <param name="msgStr">字符串</param>
    public void Send(string msgStr)
    {
        Send(Encoding.UTF8.GetBytes(msgStr));
    }

    /// <summary>
    /// 发送字符串(使用自定义编码)
    /// </summary>
    /// <param name="msgStr">字符串消息</param>
    /// <param name="encoding">使用的编码</param>
    public void Send(string msgStr, Encoding encoding)
    {
        Send(encoding.GetBytes(msgStr));
    }

    /// <summary>
    /// 传入自定义属性
    /// </summary>
    public object Property { get; set; }

    /// <summary>
    /// 关闭当前连接
    /// </summary>
    public void Close()
    {
        try
        {
            _isRec = false;
            _socket.Disconnect(false);
            _server.ClientList.Remove(this);
            HandleClientClose?.Invoke(this, _server);
            _socket.Close();
            _socket.Dispose();
            GC.Collect();
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    #endregion

    #region 事件处理

    /// <summary>
    /// 客户端连接接受新的消息后调用
    /// </summary>
    public Action<MsgBody, SocketConnection, SocketServer> HandleRecMsg { get; set; }

    /// <summary>
    /// 客户端连接发送消息后回调
    /// </summary>
    public Action<byte[], SocketConnection, SocketServer> HandleSendMsg { get; set; }

    /// <summary>
    /// 客户端连接关闭后回调
    /// </summary>
    public Action<SocketConnection, SocketServer> HandleClientClose { get; set; }

    /// <summary>
    /// 异常处理程序
    /// </summary>
    public Action<Exception> HandleException { get; set; }

    #endregion
}


/// <summary>
/// Socket服务端
/// </summary>
public class SocketServer
{
    #region 构造函数

    /// <summary>
    /// 构造函数
    /// </summary>
    /// <param name="ip">监听的IP地址</param>
    /// <param name="port">监听的端口</param>
    public SocketServer(string ip, int port)
    {
        _ip = ip;
        _port = port;
    }

    /// <summary>
    /// 构造函数,监听IP地址默认为本机0.0.0.0
    /// </summary>
    /// <param name="port">监听的端口</param>
    public SocketServer(int port)
    {
        _ip = "0.0.0.0";
        _port = port;
    }

    #endregion

    #region 内部成员

    private Socket _socket = null;
    private string _ip = "";
    private int _port = 0;
    private bool _isListen = true;
    private void StartListen()
    {
        try
        {
            _socket.BeginAccept(asyncResult =>
            {
                try
                {
                    Socket newSocket = _socket.EndAccept(asyncResult);

                    //马上进行下一轮监听,增加吞吐量
                    if (_isListen)
                        StartListen();

                    SocketConnection newClient = new SocketConnection(newSocket, this)
                    {
                        HandleRecMsg = HandleRecMsg == null ? null : new Action<MsgBody, SocketConnection, SocketServer>(HandleRecMsg),
                        HandleClientClose = HandleClientClose == null ? null : new Action<SocketConnection, SocketServer>(HandleClientClose),
                        HandleSendMsg = HandleSendMsg == null ? null : new Action<byte[], SocketConnection, SocketServer>(HandleSendMsg),
                        HandleException = HandleException == null ? null : new Action<Exception>(HandleException)
                    };

                    newClient.StartRecMsg();
                    ClientList.AddLast(newClient);

                    HandleNewClientConnected?.Invoke(this, newClient);
                }
                catch (Exception ex)
                {
                    HandleException?.Invoke(ex);
                }
            }, null);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    #endregion

    #region 外部接口

    /// <summary>
    /// 开始服务,监听客户端
    /// </summary>
    public void StartServer()
    {
        try
        {
            //实例化套接字(ip4寻址协议,流式传输,TCP协议)
            _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            //创建ip对象
            IPAddress address = IPAddress.Parse(_ip);
            //创建网络节点对象包含ip和port
            IPEndPoint endpoint = new IPEndPoint(address, _port);
            //将 监听套接字绑定到 对应的IP和端口
            _socket.Bind(endpoint);
            //设置监听队列长度为Int32最大值(同时能够处理连接请求数量)
            _socket.Listen(int.MaxValue);
            //开始监听客户端
            StartListen();
            HandleServerStarted?.Invoke(this);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    /// <summary>
    /// 所有连接的客户端列表
    /// </summary>
    public LinkedList<SocketConnection> ClientList { get; set; } = new LinkedList<SocketConnection>();

    /// <summary>
    /// 关闭指定客户端连接
    /// </summary>
    /// <param name="theClient">指定的客户端连接</param>
    public void CloseClient(SocketConnection theClient)
    {
        theClient.Close();
    }

    #endregion

    #region 公共事件

    /// <summary>
    /// 异常处理程序
    /// </summary>
    public Action<Exception> HandleException { get; set; }

    #endregion

    #region 服务端事件

    /// <summary>
    /// 服务启动后执行
    /// </summary>
    public Action<SocketServer> HandleServerStarted { get; set; }

    /// <summary>
    /// 当新客户端连接后执行
    /// </summary>
    public Action<SocketServer, SocketConnection> HandleNewClientConnected { get; set; }

    /// <summary>
    /// 服务端关闭客户端后执行
    /// </summary>
    public Action<SocketServer, SocketConnection> HandleCloseClient { get; set; }

    #endregion

    #region 客户端连接事件

    /// <summary>
    /// 客户端连接接受新的消息后调用
    /// </summary>
    public Action<MsgBody, SocketConnection, SocketServer> HandleRecMsg { get; set; }

    /// <summary>
    /// 客户端连接发送消息后回调
    /// </summary>
    public Action<byte[], SocketConnection, SocketServer> HandleSendMsg { get; set; }

    /// <summary>
    /// 客户端连接关闭后回调
    /// </summary>
    public Action<SocketConnection, SocketServer> HandleClientClose { get; set; }

    #endregion
}

public class MsgBody
{
public MsgBody()
{

    }
    public MsgBody(string str)
    {
        this.BodyData = str;
    }
    public MsgBody(byte[] b)
    {
        this.Source = b;
        SetValues();
    }

    /// <summary>
    /// 包长
    /// </summary>
    public int DataLength;
    public byte[] DataLength_Byte;

    /// <summary>
    /// 包长
    /// </summary>
    public int DataLength_2;
    public byte[] DataLength_2_Byte;

    /// <summary>
    /// 命令字 用来标记消息类型
    /// </summary>
    public int Cmd;
    public byte[] Cmd_Byte;

    /// <summary>
    /// 发生的文本内容
    /// </summary>
    public string BodyData;
    public byte[] BodyData_Byte;

    public byte End = 0;

    public byte[] Source;

    /// <summary>
    /// 数据组合为byte数组
    /// </summary>
    /// <returns></returns>
    public byte[] ToByteArray()
    {
        BodyData_Byte = Encoding.UTF8.GetBytes(BodyData);
        Cmd_Byte = BitConverter.GetBytes(Cmd);
        DataLength = DataLength_2 = BodyData_Byte.Length + Cmd_Byte.Length + 4 + 1;
        DataLength_Byte = BitConverter.GetBytes(DataLength);
        DataLength_2_Byte = BitConverter.GetBytes(DataLength_2);

        byte[] result = new byte[DataLength + 4];
        Array.Copy(DataLength_Byte, 0, result, 0, 4);
        Array.Copy(DataLength_2_Byte, 0, result, 4, 4);
        Array.Copy(Cmd_Byte, 0, result, 8, 4);
        Array.Copy(BodyData_Byte, 0, result, 12, BodyData_Byte.Length);
        Source = new byte[result.Length];
        Array.Copy(result, 0, Source, 0, result.Length);
        return result;
    }

    /// <summary>
    /// 数据解析
    /// </summary>
    public void SetValues()
    {
        try
        {
            DataLength_Byte = Common.SubByte(Source, 0, 4);
            DataLength_2_Byte = Common.SubByte(Source, 4, 4);
            Cmd_Byte = Common.SubByte(Source, 8, 4);
            BodyData_Byte = Common.SubByte(Source, 12, Source.Length - 13);
            End = Common.SubByte(Source, Source.Length - 2, 1)[0];

            DataLength = BitConverter.ToInt32(DataLength_Byte, 0);
            DataLength_2 = BitConverter.ToInt32(DataLength_2_Byte, 0);
            Cmd = BitConverter.ToInt32(Cmd_Byte, 0);
            BodyData = Encoding.UTF8.GetString(BodyData_Byte);
        }
        catch (Exception ex)
        {

            throw;
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容