Socket详解

前面两篇分析了TCP和UDP协议,本篇来分析一下Socket,有了前面的基础,对理解Socket有很大的帮助,同时也是对TCP和UDP的更深层次的了解,经过多天的资料研究和代码分析,对socket也算是有了一个清楚的认识,鉴于网上的知识比较散,所以想把关于socket的知识整理一下,希望能够帮助想理解socket的童鞋,本着这些目的,本篇就来仔细分析一下Socket的原理。

文章内容有点长,先罗列一下需要分析的要点:
1. Socket是什么?
2. Java中Socket的使用
3. Java中Socket的源码分析
4. Linux系统中Socket对TCP的详细处理过程


1. Socket是什么?

我们知道进程通信的方法有管道、命名管道、信号、消息队列、共享内存、信号量,这些方法都要求通信的两个进程位于同一个主机。但是如果通信双方不在同一个主机又该如何进行通信呢?
在计算机网络中有一个tcp/ip协议族,使用tcp/ip协议族就能达到我们想要的效果,如下图所示:


socket所处层次

图中可以看到socket是位于应用层和传输层之间的一个抽象层,那么它存在的意义又是什么呢?其实没有socket抽象层,也是可以的,但是,当我们使用不同的协议进行通信时就得使用不同的接口,还得处理不同协议的各种细节,这就增加了开发的难度,软件也不易于扩展。于是UNIX BSD就发明了socket这种东西,socket屏蔽了各个协议的通信细节,使得程序员无需关注协议本身,直接使用socket提供的接口来进行互联的不同主机间的进程的通信。这就好比操作系统给我们提供了使用底层硬件功能的系统调用,通过系统调用我们可以方便的使用磁盘(文件操作),使用内存,而无需自己去进行磁盘读写,内存管理。socket其实也是一样的东西,就是提供了tcp/ip协议的抽象,对外提供了一套接口,同过这个接口就可以统一、方便的使用tcp/ip协议的功能了。

2.Java中Socket的使用

在Java中,我们使用Socket有两种,一个是基于TCP的Socket,一个是基于UDP的DatagramSocket,本篇只分析基于TCP的Socket。现在我们来看Socket的使用:

客户端:指明主机端口创建Socket,获取输出流,写入数据。

        Socket socket = null;
        OutputStream os = null;

        try {
            socket = new Socket("192.168.1.106",9200);
            os = socket.getOutputStream();
            os.write("hello server !".getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                if(os != null){
                    os.close();
                }
                if(socket != null){
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

服务端:指明绑定的端口,创建ServerSocket,开启端口监听。

      ServerSocket serverSocket;
        try {
            serverSocket = new ServerSocket(9200);
        } catch (IOException e) {   
            return;
        }

        while (isRunning){
            try {
                Socket client = serverSocket.accept();
                handleClient(client);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        try {
            serverSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

这里只是一个简单的示例,当然实际开发中可能需要更多的东西,比如在handleClient(client)处理数据的时候加入线程池去处理,以及client的关闭等,这里只是简单的使用,主要目的还是看其源码。

3.Java中Socket的源码分析

3.1 服务端ServerSocket的源码解析

为了便于理解,我们先分析服务端ServerSocket的源码。通常我们在创建的时候需要指明绑定的端口,来看其构造函数:

    public ServerSocket(int port) throws IOException {
        this(port, 50, null);
    }

    public ServerSocket(int port, int backlog) throws IOException {
        this(port, backlog, null);
    }
  
    public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
        setImpl();
        if (port < 0 || port > 0xFFFF)
            throw new IllegalArgumentException(
                       "Port value out of range: " + port);
        if (backlog < 1)
          backlog = 50;
        try {
            bind(new InetSocketAddress(bindAddr, port), backlog);
        } catch(SecurityException e) {
            close();
            throw e;
        } catch(IOException e) {
            close();
            throw e;
        }
    }

ServerSocket有三个指明端口的构造函数,其中参数backlog表示已建立连接的最大数量,也就是accept中未被我们取走的最大连接数,这个现在不理解也没关系,后面会重点介绍这个参数,这里当未指明时,它的值默认是50;InetAddress是对IP、DNS、端口等信息的封装类。
我们看到,在构造函数中,先调用了setImpl()方法,看其源码:

    private void setImpl() {
        if (factory != null) {
            impl = factory.createSocketImpl();
            checkOldImpl();
        } else {
            // No need to do a checkOldImpl() here, we know it's an up to date
            // SocketImpl!
            impl = new SocksSocketImpl();
        }
        if (impl != null)
            impl.setServerSocket(this);
    }

factory是SocketImplFactory类型,是创建SocketImpl的工厂类;impl是SocketImpl类型。

    /**
     * The factory for all server sockets.
     */
    private static SocketImplFactory factory = null;

    /**
     * The implementation of this Socket.
     */
    private SocketImpl impl;

当我们调用构造函数的时候factory肯定是空的,所以此时会给ServerSocket的impl变量赋值为SocksSocketImpl的对象,然后调用 impl.setServerSocket(this)将ServerSocket自己和SocksSocketImpl关联起来。

回到构造函数,接下来会判断端口的正确性以及backlog的处理,最后调用了bind(new InetSocketAddress(bindAddr, port), backlog)方法来绑定该端口,看其源码:


    public void bind(SocketAddress endpoint, int backlog) throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!oldImpl && isBound())
            throw new SocketException("Already bound");
        if (endpoint == null)
            endpoint = new InetSocketAddress(0);
        if (!(endpoint instanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");
        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        if (epoint.isUnresolved())
            throw new SocketException("Unresolved address");
        if (backlog < 1)
          backlog = 50;
        try {
            SecurityManager security = System.getSecurityManager();
            if (security != null)
                security.checkListen(epoint.getPort());
            getImpl().bind(epoint.getAddress(), epoint.getPort());
            getImpl().listen(backlog);
            bound = true;
        } catch(SecurityException e) {
            bound = false;
            throw e;
        } catch(IOException e) {
            bound = false;
            throw e;
        }
    }

首先进行一大堆的判断,之后获取SecurityManager,这是一个安全策略管理器,在执行一些不安全、敏感等操作之前通常需要用到这个东西来先检查一下,比如这里在监听之前,调用了checkListen()方法,来检查当前线程是否允许在指定端口的连接请求过程中的使用wait等待,如果不允许,则会抛出异常。接着继续往下看,getImpl()源码:

    SocketImpl getImpl() throws SocketException {
        if (!created)
            createImpl();
        return impl;
    }

    void createImpl() throws SocketException {
        if (impl == null)
            setImpl();
        try {
            impl.create(true);
            created = true;
        } catch (IOException e) {
            throw new SocketException(e.getMessage());
        }
    }

    /**
     * Creates a socket with a boolean that specifies whether this
     * is a stream socket (true) or an unconnected UDP socket (false).
     */
    protected synchronized void create(boolean stream) throws IOException {
        this.stream = stream;
        if (!stream) {
            ResourceManager.beforeUdpCreate();
            // only create the fd after we know we will be able to create the socket
            fd = new FileDescriptor();
            try {
                socketCreate(false);
            } catch (IOException ioe) {
                ResourceManager.afterUdpClose();
                fd = null;
                throw ioe;
            }
        } else {
            fd = new FileDescriptor();
            socketCreate(true);
        }
        if (socket != null)
            socket.setCreated();
        if (serverSocket != null)
            serverSocket.setCreated();
    }

created初始化的时候为false,所以会调用createImpl(),impl我们在之前已经给赋值了指向SocksSocketImpl对象,所以将会调用SocksSocketImpl中继承下来的create(boolean stream)方法,可以看出,当参数为true时创建的是stream流,也就是TCP连接,当为false时是UDP。回到前面,当getImpl()返回impl对象后,就调用了它的bind方法和listen方法,我们接着来看bind方法的源码:

    /**
     * Binds the socket to the specified address of the specified local port.
     * @param address the address
     * @param lport the port
     */
    protected synchronized void bind(InetAddress address, int lport) throws IOException {
       synchronized (fdLock) {
            if (!closePending && (socket == null || !socket.isBound())) {
                NetHooks.beforeTcpBind(fd, address, lport);
            }
        }
        socketBind(address, lport);
        if (socket != null)
            socket.setBound();
        if (serverSocket != null)
            serverSocket.setBound();
    }

可以看到,调用了socketBind(address, lport),在来看socketBind方法源码:

    void socketBind(InetAddress address, int port) throws IOException {
        if (fd == null || !fd.valid()) {
            throw new SocketException("Socket closed");
        }

        IoBridge.bind(fd, address, port);

        this.address = address;
        if (port == 0) {
            // Now that we're a connected socket, let's extract the port number that the system
            // chose for us and store it in the Socket object.
            localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
        } else {
            localport = port;
        }
    }

注意这里的socketBind是Android SDK下java包中的方法,而在JDK中本身是个native方法,这里大概是谷歌工程师重新写了这个类,但是底层最终都是调用的系统的C++代码。我们接着看下去,调用了IoBridge的bind方法:

public static void bind(FileDescriptor fd, InetAddress address, int port) throws SocketException {
        if (address instanceof Inet6Address) {
            Inet6Address inet6Address = (Inet6Address) address;
            if (inet6Address.getScopeId() == 0 && inet6Address.isLinkLocalAddress()) {
                // Linux won't let you bind a link-local address without a scope id.
                // Find one.
                NetworkInterface nif = NetworkInterface.getByInetAddress(address);
                if (nif == null) {
                    throw new SocketException("Can't bind to a link-local address without a scope id: " + address);
                }
                try {
                    address = Inet6Address.getByAddress(address.getHostName(), address.getAddress(), nif.getIndex());
                } catch (UnknownHostException ex) {
                    throw new AssertionError(ex); // Can't happen.
                }
            }
        }
        try {
            Libcore.os.bind(fd, address, port);
        } catch (ErrnoException errnoException) {
            if (errnoException.errno == EADDRINUSE || errnoException.errno == EADDRNOTAVAIL ||
                errnoException.errno == EPERM || errnoException.errno == EACCES) {
                throw new BindException(errnoException.getMessage(), errnoException);
            } else {
                throw new SocketException(errnoException.getMessage(), errnoException);
            }
        }
    }

由此可以看出,最后调用了 Libcore.os.bind(fd, address, port); 那么Libcore.os又是什么,来看一下:

package libcore.io;

public final class Libcore {
    private Libcore() { }

    /**
     * Direct access to syscalls. Code should strongly prefer using {@link #os}
     * unless it has a strong reason to bypass the helpful checks/guards that it
     * provides.
     */
    public static Os rawOs = new Linux();

    /**
     * Access to syscalls with helpful checks/guards.
     */
    public static Os os = new BlockGuardOs(rawOs);
}

所以,调用了BlockGuardOs类中的bind方法,而BlockGuardOs中的bind方法是调用了rawOs对象的bind方法,而Liunx类中几乎都是native方法:

public final class Linux implements Os {
    Linux() { }

    public native FileDescriptor accept(FileDescriptor fd, SocketAddress peerAddress) throws ErrnoException, SocketException;
    public native void bind(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;
    public native void bind(FileDescriptor fd, SocketAddress address) throws ErrnoException, SocketException;
    public native void listen(FileDescriptor fd, int backlog) throws ErrnoException;
    public native void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;
    public native void connect(FileDescriptor fd, SocketAddress address) throws ErrnoException, SocketException;
    public native void close(FileDescriptor fd) throws ErrnoException;
    ...
    

这里我先将主要的几个方法列出来,后面就不用再贴代码了。
自此,bind方法就执行完了,回到之前,程序会继续执行listen方法,listen方法和bind方法的执行过程几乎完全一样,最后都是调用native层的方法,这里就不再赘述,执行完listen之后,bound = true,表示对端口成功绑定并且开启了监听。所以总的来说,在ServerSocket的构造方法中主要完成了两件事,bind和listen,完成之后,ServerSocket就会一直监听端口中客户端的连接请求,完成三次握手之后,就会建立连接并将连接放入队列中,等待应用程序调用accept从队列中取走该连接,这个过程稍后会详细介绍。

构造完ServerSocket后,应用程序会不断的调用accept来获取连接,再来看accept的源码:

    /**
     * Listens for a connection to be made to this socket and accepts
     * it. The method blocks until a connection is made.
     */
    public Socket accept() throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!isBound())
            throw new SocketException("Socket is not bound yet");
        Socket s = new Socket((SocketImpl) null);
        implAccept(s);
        return s;
    }

从注释可以看出该方法是个阻塞的方法,直到队列中有已建立的连接。从代码可以看出,每调用一次accept就会new Socket然后,去接收连接。然后我们来看implAccept方法,其源码如下:

    /**
     * Subclasses of ServerSocket use this method to override accept()
     * to return their own subclass of socket.  So a FooServerSocket
     * will typically hand this method an <i>empty</i> FooSocket.  On
     * return from implAccept the FooSocket will be connected to a client.
     */
    protected final void implAccept(Socket s) throws IOException {
        SocketImpl si = null;
        try {
            if (s.impl == null)
              s.setImpl();
            else {
                s.impl.reset();
            }
            si = s.impl;
            s.impl = null;
            si.address = new InetAddress();
            si.fd = new FileDescriptor();
            getImpl().accept(si);

            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkAccept(si.getInetAddress().getHostAddress(),
                                     si.getPort());
            }
        } catch (IOException e) {
            if (si != null)
                si.reset();
            s.impl = si;
            throw e;
        } catch (SecurityException e) {
            if (si != null)
                si.reset();
            s.impl = si;
            throw e;
        }
        s.impl = si;
        s.postAccept();
    }

Socket的setImpl方法和ServerSocket的setImpl方法实现一模一样,所以这里的变量si和参数s.impl指向的还是一个SocksSocketImpl对象,si只是临时变量,可以看到最后还是把准备好的SocksSocketImpl对象重新赋值给了s.impl;
getImpl().accept(si)是调用的SocksSocketImpl的accept方法,其源码如下:

/**
     * Accepts connections.
     * @param s the connection
     */
    protected void accept(SocketImpl s) throws IOException {
        acquireFD();
        try {
            BlockGuard.getThreadPolicy().onNetwork();
            socketAccept(s);
        } finally {
            releaseFD();
        }
    }

首先获取文件描述符,然后调用了socketAccept(s),之后在释放文件描述符资源,socketAccept源码如下:

void socketAccept(SocketImpl s) throws IOException {
        if (fd == null || !fd.valid()) {
            throw new SocketException("Socket closed");
        }

        // poll() with a timeout of 0 means "poll for zero millis", but a Socket timeout == 0 means
        // "wait forever". When timeout == 0 we pass -1 to poll.
        if (timeout <= 0) {
            IoBridge.poll(fd, POLLIN | POLLERR, -1);
        } else {
            IoBridge.poll(fd, POLLIN | POLLERR, timeout);
        }

        InetSocketAddress peerAddress = new InetSocketAddress();
        try {
            FileDescriptor newfd = Libcore.os.accept(fd, peerAddress);

            s.fd.setInt$(newfd.getInt$());
            s.address = peerAddress.getAddress();
            s.port = peerAddress.getPort();
        } catch (ErrnoException errnoException) {
            if (errnoException.errno == EAGAIN) {
                throw new SocketTimeoutException(errnoException);
            } else if (errnoException.errno == EINVAL || errnoException.errno == EBADF) {
                throw new SocketException("Socket closed");
            }
            errnoException.rethrowAsSocketException();
        }

        s.localport = IoBridge.getLocalInetSocketAddress(s.fd).getPort();
    }

这里通过调用IoBridge的poll实现阻塞,其底层还是Linux系统通过监听文件描述符事件实现了阻塞,可以看一下poll的注释:

    /**
     * Wait for some event on a file descriptor, blocks until the event happened or timeout period
     * passed. See poll(2) and @link{android.system.Os.Poll}.
     *
     * @throws SocketException if poll(2) fails.
     * @throws SocketTimeoutException if the event has not happened before timeout period has passed.
     */
    public static void poll(FileDescriptor fd, int events, int timeout)
            throws SocketException, SocketTimeoutException {
        StructPollfd[] pollFds = new StructPollfd[]{ new StructPollfd() };
        pollFds[0].fd = fd;
        pollFds[0].events = (short) events;

        try {
            int ret = android.system.Os.poll(pollFds, timeout);
            if (ret == 0) {
                throw new SocketTimeoutException("Poll timed out");
            }
        } catch (ErrnoException e) {
            e.rethrowAsSocketException();
        }
    }

最终,当连接队列中有已建立的连接后,线程就会被唤醒,继续执行下面的内容,将连接的信息封装到参数s指向的对象中返回,这个对象就是前面的si指向的对象,最后将该对象赋值给前面的s.impl,接着执行s.postAccept(),其源码如下:

    /**
     * set the flags after an accept() call.
     */
    final void postAccept() {
        connected = true;
        created = true;
        bound = true;
    }

就是标记了几个状态值,最后此s(Socket)就被返回给我们应用程序来使用了。由此ServerSocket中accept()方法就分析完毕,总结一下就是:线程会被阻塞,直到连接队列中有了新连接,线程再被唤醒,然后将连接的信息封装到一个SocketImpl对象中,再将此SocketImpl对象封装到一个Socket对象中,最后将此Socket对象返回给应用程序使用。

通过以上的分析,我们了解了服务端的bind、listen、和accept的过程,所有这些最终都是调用底层系统的方法实现的。接下来看一下客户端的分析。

3.2 客户端Socket的源码解析

Socket为指定主机地址和端口提供了6个公有的构造函数,但最终都是通过一个私有的构造函数来创建新对象。
当我们来看一下私有的构造函数源码:

private Socket(InetAddress[] addresses, int port, SocketAddress localAddr,
            boolean stream) throws IOException {
        if (addresses == null || addresses.length == 0) {
            throw new SocketException("Impossible: empty address list");
        }

        for (int i = 0; i < addresses.length; i++) {
            setImpl();
            try {
                InetSocketAddress address = new InetSocketAddress(addresses[i], port);
                createImpl(stream);
                if (localAddr != null) {
                    bind(localAddr);
                }
                connect(address);
                break;
            } catch (IOException | IllegalArgumentException | SecurityException e) {
                try {
                    // Android-changed:
                    // Do not call #close, classes that extend this class may do not expect a call
                    // to #close coming from the superclass constructor.
                    impl.close();
                    closed = true;
                } catch (IOException ce) {
                    e.addSuppressed(ce);
                }

                // Only stop on the last address.
                if (i == addresses.length - 1) {
                    throw e;
                }
            }

            // Discard the connection state and try again.
            impl = null;
            created = false;
            bound = false;
            closed = false;
        }
    }

注意此方法是AndroidSDK下java包下面的,JDK中的这个方法要比这个更简单:

 private Socket(SocketAddress address, SocketAddress localAddr,
                   boolean stream) throws IOException {
        setImpl();

        // backward compatibility
        if (address == null)
            throw new NullPointerException();

        try {
            createImpl(stream);
            if (localAddr != null)
                bind(localAddr);
            connect(address);
        } catch (IOException | IllegalArgumentException | SecurityException e) {
            try {
                close();
            } catch (IOException ce) {
                e.addSuppressed(ce);
            }
            throw e;
        }
    }

过程都是差不多的都是先setImpl();然后createImpl(stream);最后connect(address);只是Android下的会做一些额外的操作,这里我们就不再赘述,直接进入connect方法里面:

    /**
     * Connects this socket to the server with a specified timeout value.
     * A timeout of zero is interpreted as an infinite timeout. The connection
     * will then block until established or an error occurs.
     */
    public void connect(SocketAddress endpoint, int timeout) throws IOException {
        if (endpoint == null)
            throw new IllegalArgumentException("connect: The address can't be null");

        if (timeout < 0)
          throw new IllegalArgumentException("connect: timeout can't be negative");

        if (isClosed())
            throw new SocketException("Socket is closed");

        if (!oldImpl && isConnected())
            throw new SocketException("already connected");

        if (!(endpoint instanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");

        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        InetAddress addr = epoint.getAddress ();
        int port = epoint.getPort();
        checkAddress(addr, "connect");

        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            if (epoint.isUnresolved())
                security.checkConnect(epoint.getHostName(), port);
            else
                security.checkConnect(addr.getHostAddress(), port);
        }
        if (!created)
            createImpl(true);
        if (!oldImpl)
            impl.connect(epoint, timeout);
        else if (timeout == 0) {
            if (epoint.isUnresolved())
                impl.connect(addr.getHostName(), port);
            else
                impl.connect(addr, port);
        } else
            throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
        connected = true;
        /*
         * If the socket was not bound before the connect, it is now because
         * the kernel will have picked an ephemeral port & a local address
         */
        bound = true;
    }

和服务端的bind方法差不多,一样先进行一大堆的判断,最后也是调用了SocksSocketImpl类中的connect方法,看一下SocksSocketImpl中的connect源码:

    /**
     * Connects the Socks Socket to the specified endpoint. It will first
     * connect to the SOCKS proxy and negotiate the access. If the proxy
     * grants the connections, then the connect is successful and all
     * further traffic will go to the "real" endpoint.
     */
    @Override
    protected void connect(SocketAddress endpoint, int timeout) throws IOException {

        ...省略

        // Connects to the SOCKS server
        try {
               privilegedConnect(server, serverPort, remainingMillis(deadlineMillis));
         } catch (IOException e) {
                throw new SocketException(e.getMessage());
         }

         ...省略

    }

这个方法代码很多也很复杂,Android版的和JDK版的还有些代码不同,这里我们不去关注,只需要知道在里面调用了 privilegedConnect(server, serverPort, remainingMillis(deadlineMillis));跟进去:

private synchronized void privilegedConnect(final String host,final int port, final int timeout) throws IOException{
        try {
            AccessController.doPrivileged(
                new java.security.PrivilegedExceptionAction<Void>() {
                    public Void run() throws IOException {
                              superConnectServer(host, port, timeout);
                              cmdIn = getInputStream();
                              cmdOut = getOutputStream();
                              return null;
                          }
                      });
        } catch (java.security.PrivilegedActionException pae) {
            throw (IOException) pae.getException();
        }
    }

调用了AccessController的doPrivileged方法,在其中执行了PrivilegedExceptionAction对象的run方法,也就是执行了superConnectServer(host, port, timeout),而这个方法是直接调用父类AbstractPlainSocketImpl的connect方法,所以我们直接看AbstractPlainSocketImpl中的connect方法:

 /**
     * Creates a socket and connects it to the specified address on
     * the specified port.
     * @param address the address
     * @param timeout the timeout value in milliseconds, or zero for no timeout.
     * @throws IOException if connection fails
     * @throws  IllegalArgumentException if address is null or is a
     *          SocketAddress subclass not supported by this socket
     * @since 1.4
     */
    protected void connect(SocketAddress address, int timeout)
            throws IOException {
        boolean connected = false;
        try {
            if (address == null || !(address instanceof InetSocketAddress))
                throw new IllegalArgumentException("unsupported address type");
            InetSocketAddress addr = (InetSocketAddress) address;
            if (addr.isUnresolved())
                throw new UnknownHostException(addr.getHostName());
            this.port = addr.getPort();
            this.address = addr.getAddress();

            connectToAddress(this.address, port, timeout);
            connected = true;
        } finally {
            if (!connected) {
                try {
                    close();
                } catch (IOException ioe) {
                    /* Do nothing. If connect threw an exception then
                       it will be passed up the call stack */
                }
            }
        }
    }

很明显,执行了connectToAddress(this.address, port, timeout):

  private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
        if (address.isAnyLocalAddress()) {
            doConnect(InetAddress.getLocalHost(), port, timeout);
        } else {
            doConnect(address, port, timeout);
        }
    }

判断端口是否是本地的还是远程的,然后执行相应的doConnect方法:

   /**
     * The workhorse of the connection operation.  Tries several times to
     * establish a connection to the given <host, port>.  If unsuccessful,
     * throws an IOException indicating what went wrong.
     */

    synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
        synchronized (fdLock) {
            if (!closePending && (socket == null || !socket.isBound())) {
                NetHooks.beforeTcpConnect(fd, address, port);
            }
        }
        try {
            acquireFD();
            try {
                BlockGuard.getThreadPolicy().onNetwork();
                socketConnect(address, port, timeout);
                /* socket may have been closed during poll/select */
                synchronized (fdLock) {
                    if (closePending) {
                        throw new SocketException ("Socket closed");
                    }
                }
                // If we have a ref. to the Socket, then sets the flags
                // created, bound & connected to true.
                // This is normally done in Socket.connect() but some
                // subclasses of Socket may call impl.connect() directly!
                if (socket != null) {
                    socket.setBound();
                    socket.setConnected();
                }
            } finally {
                releaseFD();
            }
        } catch (IOException e) {
            close();
            throw e;
        }
    }

在这其中执行了socketConnect(address, port, timeout);此方法在AbstractPlainSocketImpl中是个抽象方法,是由SocksSocketImpl的直接父类PlainSocketImpl实现的,来看一下其源码:

    void socketConnect(InetAddress address, int port, int timeout) throws IOException {
        if (fd == null || !fd.valid()) {
            throw new SocketException("Socket closed");
        }

        IoBridge.connect(fd, address, port, timeout);

        this.address = address;
        this.port = port;

        if (localport == 0) {
            // If socket is pending close, fd becomes an AF_UNIX socket and calling
            // getLocalInetSocketAddress will fail.
            // http://b/34645743
            if (!isClosedOrPending()) {
                localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
            }
        }
    }

调用了IoBridge的connect方法,再来看这个方法:

  /**
     * Connects socket 'fd' to 'inetAddress' on 'port', with a the given 'timeoutMs'.
     * Use timeoutMs == 0 for a blocking connect with no timeout.
     */
    public static void connect(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws SocketException, SocketTimeoutException {
        try {
            connectErrno(fd, inetAddress, port, timeoutMs);
        } catch (ErrnoException errnoException) {
            if (errnoException.errno == EHOSTUNREACH) {
                throw new NoRouteToHostException("Host unreachable");
            }
            if (errnoException.errno == EADDRNOTAVAIL) {
                throw new NoRouteToHostException("Address not available");
            }
            throw new ConnectException(connectDetail(fd, inetAddress, port, timeoutMs,
                    errnoException), errnoException);
        } catch (SocketException ex) {
            throw ex; // We don't want to doubly wrap these.
        } catch (SocketTimeoutException ex) {
            throw ex; // We don't want to doubly wrap these.
        } catch (IOException ex) {
            throw new SocketException(ex);
        }
    }

直接调用了connectErrno方法,其源码如下:

private static void connectErrno(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws ErrnoException, IOException {
        // With no timeout, just call connect(2) directly.
        if (timeoutMs <= 0) {
            Libcore.os.connect(fd, inetAddress, port);
            return;
        }

        // For connect with a timeout, we:
        //   1. set the socket to non-blocking,
        //   2. connect(2),
        //   3. loop using poll(2) to decide whether we're connected, whether we should keep
        //      waiting, or whether we've seen a permanent failure and should give up,
        //   4. set the socket back to blocking.

        // 1. set the socket to non-blocking.
        IoUtils.setBlocking(fd, false);

        // 2. call connect(2) non-blocking.
        long finishTimeNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
        try {
            Libcore.os.connect(fd, inetAddress, port);
            IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
            return; // We connected immediately.
        } catch (ErrnoException errnoException) {
            if (errnoException.errno != EINPROGRESS) {
                throw errnoException;
            }
            // EINPROGRESS means we should keep trying...
        }

        // 3. loop using poll(2).
        int remainingTimeoutMs;
        do {
            remainingTimeoutMs =
                    (int) TimeUnit.NANOSECONDS.toMillis(finishTimeNanos - System.nanoTime());
            if (remainingTimeoutMs <= 0) {
                throw new SocketTimeoutException(connectDetail(fd, inetAddress, port, timeoutMs,
                        null));
            }
        } while (!IoBridge.isConnected(fd, inetAddress, port, timeoutMs, remainingTimeoutMs));
        IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
    }

connectErrno实现了connect+poll的逻辑,可以看出,如果没有设置超时时间,则立刻调用Libcore.os.connect(fd, inetAddress, port)来建立连接,之前我们已经罗列出来了,这是个native方法,是调用系统底层来实现的。接着往下看,注释已经说明的很清楚了, 如果有timeout的时候,进行非阻塞connect,然后用poll进行事件轮询直到timeout。OK,现在再回到privilegedConnect方法中的run方法里面,当连接没报异常,表示连接成功,然后调用getInputStream()和getOutputStream()来初始化了输入流和输出流。


    protected synchronized InputStream getInputStream() throws IOException {
        synchronized (fdLock) {
            if (isClosedOrPending())
                throw new IOException("Socket Closed");
            if (shut_rd)
                throw new IOException("Socket input is shutdown");
            if (socketInputStream == null)
                socketInputStream = new SocketInputStream(this);
        }
        return socketInputStream;
    }

    protected synchronized OutputStream getOutputStream() throws IOException {
        synchronized (fdLock) {
            if (isClosedOrPending())
                throw new IOException("Socket Closed");
            if (shut_wr)
                throw new IOException("Socket output is shutdown");
            if (socketOutputStream == null)
                socketOutputStream = new SocketOutputStream(this);
        }
        return socketOutputStream;
    }

自此,Socket中的成员变量SocksSocketImpl中的输入和输出流就已经被初始化好了,然后就可以给应用程序使用。
最后总结一下客户端Socket:当我们指定服务器地址和端口来构造完Socket时,会通过SocksSocketImpl对象去调用系统底层与与服务端建立连接,连接建立成功后,接着就初始化SocksSocketImpl的输入流和输出流,供我们应用程序使用。

4. Linux系统中Socket对TCP的详细处理过程

对于Socket对TCP的详细处理过程,只需要一张图,你就明白了:


处理过程

在前面文章中,我们分析了TCP在建立连接的三次握手,但是只分析了数据包的内容,并没有分析服务端系统底层的socket是怎么处理的,现在我们就来继续分析。
首先我们要知道:Linux内核协议栈为TCP连接管理使用两个队列,一个是SYN队列(半链接队列,用来保存处于SYN_SENT和SYN_RECV状态的请求),一个是accpetd队列(用来保存处于established状态,但是应用层没有调用accept取走的请求),这两个队列是内核实现的,当服务器绑定、监听了某个端口后,这个端口的SYN队列和ACCEPT队列就建立好了。如图,整个过程就是:

1. 当SYN包到达了服务器后,内核会把这一信息放到SYN队列(即未完成握手队列)中,同时回一个SYN+ACK包给客户端。
2. 一段时间后,客户端再次发来了针对服务器SYN包的ACK网络分组时,内核会把连接从SYN队列中取出,再把这个连接放到ACCEPT队列(即已完成握手队列)中。
3. 服务器在第3步调用accept时,其实就是直接从ACCEPT队列中取出已经建立成功的连接套接字而已。

现在我们来讨论其中可能出现的问题:
前面我们在分析java源码中讲到在创建ServerSocket的时候,需要指定一个backlog参数,默认传的是50,那么这个参数到底是什么?
在Linux内核2.2之前,backlog参数是用来限制SYN队列和ACCEPT队列的长度的,两个队列的大小都是相同的。但是在2.2以后,就分离为两个backlog来分别限制这两个队列的大小。

第一个队列的长度是/proc/sys/net/ipv4/tcp_max_syn_backlog,默认是1024。如果开启了syncookies,那么基本上没有限制。

第二个队列的长度是/proc/sys/net/core/somaxconn,默认是128,表示最多有129个established链接等待accept。

那么问题来了,当这两个队列满了后,新的请求到达了又将发生什么?

对于SYN队列,若队列满,则会直接丢弃请求,即新的SYN网络分组会被丢弃,这个问题好解决,客户端接收不到回复,会再一次发送,然后服务端继续丢弃,知道队列有空闲的位置。而客户端如果一直接收不到回复,发几次之后就会停止。

对于ACCEPT队列的处理就有点复杂了,分两种情况:

1.如果server端设置了sysctl_tcp_abort_on_overflow,那么server会发送rst给client,并删除掉这个链接。默认情况下是不会设置的。

2.如果没有设置sysctl_tcp_abort_on_overflow ,server端只是标记连接请求块的acked标志,并且连接建立定时器,会遍历半连接表,重新发送synack,重复上面的过程,如果重传次数超过synack重传的阀值,会把该连接从半连接链表中直接删除。

下面来模拟一下第二种情况:

1.Server收到ack后,尝试将连接放到accept队列,但是因为accept队列已满,所以只是标记连接为acked,并不会将连接移动到accept队列中,也不会为连接分配sendbuf和recvbuf等资源。
2.client端在发送ack之后就认为已经建立了连接,于是开始向该连接发送数据。
3.Server端收到数据包,由于acept队列仍然是满的,所以server端处理也只是标记acked,并连接建立定时器,然后返回。
4.client端由于没有收到刚才发送数据的ack,所以会重传刚才的数据包
5.同上
6.同上
7.同上
8.server端连接建立定时器生效,遍历半连接链表,发现刚才acked的连接,重新发送synack给client端。注意这个重发是有次数的,如果次数用完,服务端就直接删除了这个半连接。
9.client端收到synack后,根据ack值,使用SACK算法,只重传最后一个ack内容。
10.Server端收到数据包,如果accept队列仍然是满的,则还是只标记acked,然后返回。
11.client端等待一段时间后,认为连接不可用,于是发送FIN、ACK给server端。Client端的状态变为FIN_WAIT1,等待一段时间后,client端将看不到该链接(终止了)。

但是在上面第10步,Server端收到数据包,如果此时服务器进程处理完一个请求,从accept队列中取走一个了连接,此时accept队列中有了空闲,server端将请求的连接放到accept队列中。这样应用服务器进程显示该链接是established的,但是在client端上已经没有该链接了。之后,应用服务器进程通过该连接调用read去读取sock中的内容,但是由于client端早就退出了,所以read会一直阻塞的。或许你会认为在server端不应该建立此连接,这是内核的bug。但是内核是按照RFC来实现的,在3次握手的过程中,是不会判断FIN标志位的,只会处理SYN、ACK、RST这三种标志位。从应用层的角度来考虑解决问题的方法,那就是使用非阻塞的方式read,或者使用select超时方式read;亦或者client端关闭连接的时候使用RST方式,而不是FIN方式。

应用

经过上面分析的这些之后,我们应用层要做的就是尽量避免ACCEPT队列拥堵,所以,如TOMCAT等服务器会使用独立的线程,只做accept获取连接这一件事,以防止不能及时的去accept获取连接。但是也有的一些服务器,如Nginx,在一个线程内做accept的同时,还会做其他IO等操作,这又是为什么呢?
这里主要是由于阻塞和非阻塞的概念,应用程序可以把listen时设置的套接字设为非阻塞模式(默认为阻塞模式),这两种模式会导致accept方法有不同的行为:
1. 阻塞套接字上使用accept,第一个阶段是等待ACCEPT队列不为空的阶段,它耗时不定,由客户端是否向自己发起了TCP请求而定,可能会耗时很长。

2. 非阻塞套接字上的accept,不存在等待ACCEPT队列不为空的阶段,它要么返回成功并拿到建立好的连接,要么返回失败。因此它在accept阶段是不会阻塞的。

所以,在企业级的服务器进程中,若某一线程既使用accept获取新连接,又继续在这个连接上读、写字符流,那么,这个连接对应的套接字通常要设为非阻塞。调用accept时不会长期占用所属线程的CPU时间片,使得线程能够及时的做其他工作。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容

  • socket详解:https://blog.csdn.net/hdfqq188816190/article/det...
    wvqusrtg阅读 1,737评论 0 10
  • 什么是Socket? 什么是socket呢?socket起源于Unix,而Unix/Linux基本哲学之一就是“一...
    David_Do阅读 379评论 0 0
  • 在客户端/服务器通信模式中,Socket是双方通信通道的抽象封装,用户可通过配置Socket的参数并构建Socke...
    桥头放牛娃阅读 4,762评论 1 11
  • AsyncSocket简介 socket(套接字)概念: 网络上两个程序通过一个双向的通信连接实现数据的交换,这个...
    醉卧栏杆听雨声阅读 987评论 1 2
  • 无聊时你可以这样做! 图书馆出来黑米和她娘决定做件惊天动地的大事------偷窥偷听陌生人。 行动地点黑米又是选在...
    浪漫老庄阅读 252评论 0 0