Vert.x 实现基于TCP协议的半包粘包问题的处理

一、简介

  • 1、协议格式 : 采用二进制格式,【协议标识】【正文长度】【正文内容】
    协议标识:1个字节
    内容长度:4个字节 (存放正文内容的长度int)
    正文内容:可变长度

二、代码实现

  • 1 客户端模拟

public class TCPClient {

    public static void main(String[] args) throws Exception {
        TCPClient client = new TCPClient();
        client.sendAndReceive();


    }

    private void sendAndReceive() throws Exception {
        Socket client = new Socket("127.0.0.1", 1234);
        OutputStream out = client.getOutputStream();
        int i = 0;
        while (true){ 
            byte head =0x1A ;
            String data = "test"+ (int)(Math.random()*1000);
            out.write(head); //写头
            int len = data.getBytes().length;
            byte[] bytLen = BytesUtil.int2Bytes(len);
            out.write(bytLen); //长度
            out.flush();

            Thread.sleep(2*1000);
            out.write(data.getBytes());
            System.out.println(++i + "=====================发送结束!"+head+len+data);
        }
    }
}

  • 2 服务端处理粘包、半包问题 。半包问题,由于vertx自带缓存不需要关注,只关注粘包问题即可
@Slf4j
public class WorkerVerticleTcp extends AbstractVerticle {

  String workerName = null;

  @Override
  public void start() throws Exception {
      // 创建TCP Server
      int len_header = 1+4 ; // 1个字节类型,4个字节长度
      NetServer server = vertx.createNetServer();
      // 设置Handler
      server.connectHandler(socket -> {
          // 构造parser
          RecordParser parser = RecordParser.newFixed(len_header);
          parser.setOutput(new Handler<Buffer>() {
              int size = -1;
              @Override
              public void handle(Buffer buffer) {
                  System.out.println("收到数据"+buffer.toString());
                  //找到头
                  if (-1 == size) {
                      byte head = buffer.getByte(0); //第一个字节0x1A
                      if(head != 0x1A){
                          System.out.println("----ignore----->"+head);
                          return; //头,忽略
                      }
                      byte h0 = buffer.getByte(0); //第1个字节是类型0x1A
                      size = buffer.getInt(1); // 从第2~3个字节是长度
                      parser.fixedSizeMode(size);
                      System.out.println(buffer.toString());
                  } else {
                      byte[] buf = buffer.getBytes();
                      try {
                          String str = new String(buf);
                          System.out.println(str);
                      } catch (Exception e) {
                          return;
                      }
                      parser.fixedSizeMode(len_header);
                      size = -1;
                  }//end if-else
              }
          });

          socket.handler(parser);
      });

      // 监听
      server.listen(1234, "127.0.0.1", res -> {
          if (res.succeeded()) {
              System.out.println("tcp server is listening at 1234 ");

          } else {
              System.out.println(res.cause());
          }
      });

  }
}


  • 3 主函数所在类
@Slf4j
public class TcpServerDemo extends AbstractVerticle {
    String proptiestPath = null;
    public static void main(String[] args) {
        Runner.runExample(TcpServerDemo.class);
    }
    @Override
    public void start() {
        vertx.deployVerticle(WorkerVerticleTcp.class,new 
        DeploymentOptions().setWorker(true).setInstances(1));
        //websocket
    }
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。