本节我们在上一节基础上进一步完成TCP协议的收发机制。上一节我们已经实现了向服务器方发送一个字符,本节我们要实现连续发送多个字符,并且能正常接收数据功能,完成了这些功能后,我们就可以基于此去开发其他构建在TCP之上的其他协议。
为了保证数据能正确的连续收发,本节的设计思路是使用一个队列将发送的数据存储起来,然后将数据包发送,只有等待收到对方回发的ack后,我们才将数据从队列中删除,如果数据包一直没有收到ack回应,我们就启动一个timer,自动将队列中的数据包进行发送,如果发送给定次数后还没有成功,那么就通知数据发送层发送失败,接下来我们看看相应代码设计。
class SendPacketWrapper {
//将发送的数据包封装起来存储在队列中
private byte[] packet_to_send;
private int seq_num;
private int ack_num;
private int send_count = 0;
public SendPacketWrapper(byte[] packet, int seq_num) {
this.packet_to_send = packet;
this.seq_num = seq_num;
this.ack_num = seq_num + packet.length;
}
public byte[] get_packet() {
return this.packet_to_send;
}
public int get_seq_num() {
return this.seq_num;
}
public int get_ack_num() {
return this.ack_num;
}
public void increase_send_count() {
this.send_count++;
}
public int get_send_count() {
return this.send_count;
}
}
class SendPacketTask extends TimerTask {
private TCPThreeHandShakes task_handler;
public SendPacketTask(TCPThreeHandShakes handler) {
this.task_handler = handler;
}
@Override
public void run() {
this.task_handler.sendPacketInList();
}
}
第一个类用于负责把发送的数据封装起来,他记录了数据的缓冲区,以及发送时对应的seq号,这样当数据包需要重发时就可以再次使用这个数值进行发送,同时也记录了应对的ack号,这样当对方返回ack值时,我们才能检验该数据包是否已经被对方接收。
接下来我们在类TCPThreeHandShakes中添加相应变量和代码:
public class TCPThreeHandShakes extends Application{
。。。。
private int tcp_state = CONNECTION_IDLE;
private static int PACKET_SEND_TIMES = 3; //连续发生3次不成功则失败
private Timer send_timer = new Timer(); //定时将发送队列中的数据包进行发送
private int packet_resend_time = 2000; //每过一秒就发送队列中存储的数据包
private SendPacketTask resend_packet_task = null;
//每次发送数据包时先将它存储在队列中,发送出去收到ack后再将它从队列中去除
private ArrayList<SendPacketWrapper> send_packet_list = new ArrayList<SendPacketWrapper>();
public TCPThreeHandShakes(byte[] server_ip, short server_port, ITCPHandler tcp_handler) {
this.dest_ip = server_ip;
this.dest_port = server_port;
//指定一个固定端口,以便抓包调试
Random rand = new Random();
this.port = (short)rand.nextInt();
this.tcp_handler = tcp_handler;
resend_packet_task = new SendPacketTask(this);
send_timer.scheduleAtFixedRate(resend_packet_task, packet_resend_time, packet_resend_time);
}
。。。。
}
我们添加了一系列与数据包发送和检验变量和代码,特别是启动一个timer,在每两秒就去检测数据包队列,如果里面还有数据包没有接收到对应的ack,也就是上次发送时对方没有成功接收,那么timer就会将数据包再次发送,如果已经发送超过给定次数,timer就会通知上层应用数据发送失败。
接下来我们要添加把数据包在发送时存储到队列和检验队列数据包的代码:
private void savePacketToList(byte[] packet) {
//如果数据包没有存在队列中就加入队列
boolean contains = false;
for(int i = 0; i < send_packet_list.size(); i++) {
SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
if (packet_wrapper.get_packet() == packet) {
contains = true;
break;
}
}
if (contains == false) {
SendPacketWrapper packet_wrapper = new SendPacketWrapper(packet, this.seq_num);
this.send_packet_list.add(packet_wrapper);
}
}
public void sendPacketInList() {
ArrayList<SendPacketWrapper> wrapper_list = new ArrayList<SendPacketWrapper>();
//将所有在队列中的数据包系数发送,如果数据包发送次数大于给定次数则报告失败
for(int i = 0; i < this.send_packet_list.size(); i++) {
SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
if (packet_wrapper.get_send_count() >= PACKET_SEND_TIMES) {
this.tcp_handler.send_notify(false, packet_wrapper.get_packet());
}
else {
int old_seq_num = this.seq_num;
this.seq_num = packet_wrapper.get_seq_num();
try {
createAndSendPacket(packet_wrapper.get_packet(), "ACK");
} catch (Exception e) {
e.printStackTrace();
}
this.seq_num = old_seq_num;
wrapper_list.add(packet_wrapper);
}
}
this.send_packet_list = wrapper_list;
}
private void checkSendPacketByACK(int recv_ack) {
ArrayList<SendPacketWrapper> wrapper_list = new ArrayList<SendPacketWrapper>();
//所有ack值小于返回ack的数据包都已经成功发送,此时要将数据包从队列移除并通知上层
for(int i = 0; i < this.send_packet_list.size(); i++) {
SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
int ack = packet_wrapper.get_ack_num();
System.out.println("receive ack: " + ack);
if (packet_wrapper.get_ack_num() <= recv_ack) {
this.seq_num = packet_wrapper.get_ack_num();
System.out.println("next seq num: "+ this.seq_num);
this.tcp_handler.send_notify(true, packet_wrapper.get_packet());
}
else {
wrapper_list.add(packet_wrapper);
}
}
this.send_packet_list = wrapper_list;
this.seq_num = recv_ack;
}
在这三个函数中,第一个负责数据包第一次发送时将其存储在队列,第二个负责轮训队列,将上次没有发送成功的数据包继续发送,如果发送超过给定次数则向上层应用报告发送失败,最后一个函数是在收到对方发来的ack包后检验队列中哪些数据包发送成功,检验标准是所有ack小于对方发来ack数值的数据包都表明成功发送,接下来在handleData函数,也就是接收对方发来数据包的函数里我们增加如下流程:
@Override
public void handleData(HashMap<String, Object> headerInfo) {
。。。。
if (tcp_state == CONNECTION_SEND || tcp_state == CONNECTION_CONNECTED) {
tcp_state = CONNECTION_CONNECTED;
checkSendPacketByACK(ack_num);
if (data != null && data.length > 0 && seq_num == this.ack_num) {
/*
* 这里我们简化数据的接收流程,为了提升数据发送效率,很有可能数据包的到来次序与服务器发送时不一样
* ,但为了让实现逻辑简单,我们每次只接收指定数据包,例如当前我们等待seq编号为1,2,3的数据包,结果
* 数据包抵达的次序为3,1,2,那么我们就只接收数据包1,让对方再次发送数据包2,3,显然这样子会降低效率,
* 但为了实现逻辑简单,我们暂时做妥协
*/
this.tcp_handler.recv_notify(data);
createAndSendPacket(null, "ACK");
}
}
。。。。、
}
新添加这段代码的作用是当对方数据包到来时,我们先抽取出包中的ack值,使用该值去检验队列中哪些数据包已经成功发送,同时如果对方发来的数据包中有数据的话,我们就把数据取出,然后提交给上层应用,最后我们看看上层如何使用该tcp连接层来实现数据发送:
package Application;
import java.net.InetAddress;
import utils.ITCPHandler;
public class TCPRawDataSender implements ITCPHandler{
private TCPThreeHandShakes tcp_socket = null;
private String[] buffer = new String[] {"h", "e", "l", "l", "o"};
private int buffer_p = 0;
private byte[] current_send_packet = null;
private void send_content() throws Exception {
if (buffer_p < buffer.length) {
System.out.println("send content: " + buffer[buffer_p]);
byte[] send_content = buffer[buffer_p].getBytes();
current_send_packet = send_content;
tcp_socket.tcp_send(send_content);
}
}
@Override
public void connect_notify(boolean connect_res) {
if (connect_res) {
System.out.println("connection established!");
try {
send_content();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
else {
System.out.println("connection fail!");
}
}
private void close_connection() {
try {
tcp_socket.tcp_close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void send_notify(boolean send_res, byte[] packet_send) {
if (send_res == true) {
System.out.println("send data, buffer_p: " + buffer_p);
if (packet_send == current_send_packet) {
buffer_p++;
current_send_packet = null;
}
}
if (buffer_p >= buffer.length || send_res == false) {
String info = "send all data ";
if (send_res == false) {
info = "send fail with buffer_p: " + buffer_p;
}
System.out.println(info);
}
else {
try {
send_content();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
public void connect_close_notify(boolean close_res) {
if (close_res == true) {
System.out.println("connection close complete!");
} else {
System.out.println("connection close fail!");
}
}
public void run() {
try {
InetAddress ip = InetAddress.getByName("192.168.2.127"); //220.181.43.8
short port = 1234;
tcp_socket = new TCPThreeHandShakes(ip.getAddress(), port, this);
tcp_socket.tcp_connect();
System.out.println("finish handshake!");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void recv_notify(byte[] packet_recv) {
System.out.println("receive data: " + new String(packet_recv));
close_connection();
}
}
该类继承了ITCPHandler接口以便接收数据发送过程的各种调用,其中buffer中存储的是要发送给对方的数据,当connect_notify被调用时,如果连接成功,他就会使用send_content函数发送缓冲区里的一个字符,如果发送成功,它的send_notify会调用,在该函数里,他检验成功发送的数据是不是自己当前正在发生的数据,如果是它就将缓冲器指针挪动一位发送下一个字符,当所有数据发送完毕后,它会等待对方向它发送数据,一旦成功接收对方发来的数据,它的recv_notify函数会被调用,此时它把对方发送来的数据显示出来后,调用close_connection关闭连接
我在iphone上安装了一款名为tcp server的免费app做实验,我是上面代码与该app创建的tcp server服务器连接,然后将数据发送给他,并接收从它发过来的数据,最后运行结果如下图:
更多技术信息,包括操作系统,编译器,面试算法,机器学习,人工智能,请关照我的公众号: