前面两篇分析了TCP和UDP协议,本篇来分析一下Socket,有了前面的基础,对理解Socket有很大的帮助,同时也是对TCP和UDP的更深层次的了解,经过多天的资料研究和代码分析,对socket也算是有了一个清楚的认识,鉴于网上的知识比较散,所以想把关于socket的知识整理一下,希望能够帮助想理解socket的童鞋,本着这些目的,本篇就来仔细分析一下Socket的原理。
文章内容有点长,先罗列一下需要分析的要点:
1. Socket是什么?
2. Java中Socket的使用
3. Java中Socket的源码分析
4. Linux系统中Socket对TCP的详细处理过程
1. Socket是什么?
我们知道进程通信的方法有管道、命名管道、信号、消息队列、共享内存、信号量,这些方法都要求通信的两个进程位于同一个主机。但是如果通信双方不在同一个主机又该如何进行通信呢?
在计算机网络中有一个tcp/ip协议族,使用tcp/ip协议族就能达到我们想要的效果,如下图所示:
图中可以看到socket是位于应用层和传输层之间的一个抽象层,那么它存在的意义又是什么呢?其实没有socket抽象层,也是可以的,但是,当我们使用不同的协议进行通信时就得使用不同的接口,还得处理不同协议的各种细节,这就增加了开发的难度,软件也不易于扩展。于是UNIX BSD就发明了socket这种东西,socket屏蔽了各个协议的通信细节,使得程序员无需关注协议本身,直接使用socket提供的接口来进行互联的不同主机间的进程的通信。这就好比操作系统给我们提供了使用底层硬件功能的系统调用,通过系统调用我们可以方便的使用磁盘(文件操作),使用内存,而无需自己去进行磁盘读写,内存管理。socket其实也是一样的东西,就是提供了tcp/ip协议的抽象,对外提供了一套接口,同过这个接口就可以统一、方便的使用tcp/ip协议的功能了。
2.Java中Socket的使用
在Java中,我们使用Socket有两种,一个是基于TCP的Socket,一个是基于UDP的DatagramSocket,本篇只分析基于TCP的Socket。现在我们来看Socket的使用:
客户端:指明主机端口创建Socket,获取输出流,写入数据。
Socket socket = null;
OutputStream os = null;
try {
socket = new Socket("192.168.1.106",9200);
os = socket.getOutputStream();
os.write("hello server !".getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if(os != null){
os.close();
}
if(socket != null){
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
服务端:指明绑定的端口,创建ServerSocket,开启端口监听。
ServerSocket serverSocket;
try {
serverSocket = new ServerSocket(9200);
} catch (IOException e) {
return;
}
while (isRunning){
try {
Socket client = serverSocket.accept();
handleClient(client);
} catch (IOException e) {
e.printStackTrace();
}
}
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
这里只是一个简单的示例,当然实际开发中可能需要更多的东西,比如在handleClient(client)处理数据的时候加入线程池去处理,以及client的关闭等,这里只是简单的使用,主要目的还是看其源码。
3.Java中Socket的源码分析
3.1 服务端ServerSocket的源码解析
为了便于理解,我们先分析服务端ServerSocket的源码。通常我们在创建的时候需要指明绑定的端口,来看其构造函数:
public ServerSocket(int port) throws IOException {
this(port, 50, null);
}
public ServerSocket(int port, int backlog) throws IOException {
this(port, backlog, null);
}
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
ServerSocket有三个指明端口的构造函数,其中参数backlog表示已建立连接的最大数量,也就是accept中未被我们取走的最大连接数,这个现在不理解也没关系,后面会重点介绍这个参数,这里当未指明时,它的值默认是50;InetAddress是对IP、DNS、端口等信息的封装类。
我们看到,在构造函数中,先调用了setImpl()方法,看其源码:
private void setImpl() {
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
impl = new SocksSocketImpl();
}
if (impl != null)
impl.setServerSocket(this);
}
factory是SocketImplFactory类型,是创建SocketImpl的工厂类;impl是SocketImpl类型。
/**
* The factory for all server sockets.
*/
private static SocketImplFactory factory = null;
/**
* The implementation of this Socket.
*/
private SocketImpl impl;
当我们调用构造函数的时候factory肯定是空的,所以此时会给ServerSocket的impl变量赋值为SocksSocketImpl的对象,然后调用 impl.setServerSocket(this)将ServerSocket自己和SocksSocketImpl关联起来。
回到构造函数,接下来会判断端口的正确性以及backlog的处理,最后调用了bind(new InetSocketAddress(bindAddr, port), backlog)方法来绑定该端口,看其源码:
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
首先进行一大堆的判断,之后获取SecurityManager,这是一个安全策略管理器,在执行一些不安全、敏感等操作之前通常需要用到这个东西来先检查一下,比如这里在监听之前,调用了checkListen()方法,来检查当前线程是否允许在指定端口的连接请求过程中的使用wait等待,如果不允许,则会抛出异常。接着继续往下看,getImpl()源码:
SocketImpl getImpl() throws SocketException {
if (!created)
createImpl();
return impl;
}
void createImpl() throws SocketException {
if (impl == null)
setImpl();
try {
impl.create(true);
created = true;
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
/**
* Creates a socket with a boolean that specifies whether this
* is a stream socket (true) or an unconnected UDP socket (false).
*/
protected synchronized void create(boolean stream) throws IOException {
this.stream = stream;
if (!stream) {
ResourceManager.beforeUdpCreate();
// only create the fd after we know we will be able to create the socket
fd = new FileDescriptor();
try {
socketCreate(false);
} catch (IOException ioe) {
ResourceManager.afterUdpClose();
fd = null;
throw ioe;
}
} else {
fd = new FileDescriptor();
socketCreate(true);
}
if (socket != null)
socket.setCreated();
if (serverSocket != null)
serverSocket.setCreated();
}
created初始化的时候为false,所以会调用createImpl(),impl我们在之前已经给赋值了指向SocksSocketImpl对象,所以将会调用SocksSocketImpl中继承下来的create(boolean stream)方法,可以看出,当参数为true时创建的是stream流,也就是TCP连接,当为false时是UDP。回到前面,当getImpl()返回impl对象后,就调用了它的bind方法和listen方法,我们接着来看bind方法的源码:
/**
* Binds the socket to the specified address of the specified local port.
* @param address the address
* @param lport the port
*/
protected synchronized void bind(InetAddress address, int lport) throws IOException {
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpBind(fd, address, lport);
}
}
socketBind(address, lport);
if (socket != null)
socket.setBound();
if (serverSocket != null)
serverSocket.setBound();
}
可以看到,调用了socketBind(address, lport),在来看socketBind方法源码:
void socketBind(InetAddress address, int port) throws IOException {
if (fd == null || !fd.valid()) {
throw new SocketException("Socket closed");
}
IoBridge.bind(fd, address, port);
this.address = address;
if (port == 0) {
// Now that we're a connected socket, let's extract the port number that the system
// chose for us and store it in the Socket object.
localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
} else {
localport = port;
}
}
注意这里的socketBind是Android SDK下java包中的方法,而在JDK中本身是个native方法,这里大概是谷歌工程师重新写了这个类,但是底层最终都是调用的系统的C++代码。我们接着看下去,调用了IoBridge的bind方法:
public static void bind(FileDescriptor fd, InetAddress address, int port) throws SocketException {
if (address instanceof Inet6Address) {
Inet6Address inet6Address = (Inet6Address) address;
if (inet6Address.getScopeId() == 0 && inet6Address.isLinkLocalAddress()) {
// Linux won't let you bind a link-local address without a scope id.
// Find one.
NetworkInterface nif = NetworkInterface.getByInetAddress(address);
if (nif == null) {
throw new SocketException("Can't bind to a link-local address without a scope id: " + address);
}
try {
address = Inet6Address.getByAddress(address.getHostName(), address.getAddress(), nif.getIndex());
} catch (UnknownHostException ex) {
throw new AssertionError(ex); // Can't happen.
}
}
}
try {
Libcore.os.bind(fd, address, port);
} catch (ErrnoException errnoException) {
if (errnoException.errno == EADDRINUSE || errnoException.errno == EADDRNOTAVAIL ||
errnoException.errno == EPERM || errnoException.errno == EACCES) {
throw new BindException(errnoException.getMessage(), errnoException);
} else {
throw new SocketException(errnoException.getMessage(), errnoException);
}
}
}
由此可以看出,最后调用了 Libcore.os.bind(fd, address, port); 那么Libcore.os又是什么,来看一下:
package libcore.io;
public final class Libcore {
private Libcore() { }
/**
* Direct access to syscalls. Code should strongly prefer using {@link #os}
* unless it has a strong reason to bypass the helpful checks/guards that it
* provides.
*/
public static Os rawOs = new Linux();
/**
* Access to syscalls with helpful checks/guards.
*/
public static Os os = new BlockGuardOs(rawOs);
}
所以,调用了BlockGuardOs类中的bind方法,而BlockGuardOs中的bind方法是调用了rawOs对象的bind方法,而Liunx类中几乎都是native方法:
public final class Linux implements Os {
Linux() { }
public native FileDescriptor accept(FileDescriptor fd, SocketAddress peerAddress) throws ErrnoException, SocketException;
public native void bind(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;
public native void bind(FileDescriptor fd, SocketAddress address) throws ErrnoException, SocketException;
public native void listen(FileDescriptor fd, int backlog) throws ErrnoException;
public native void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;
public native void connect(FileDescriptor fd, SocketAddress address) throws ErrnoException, SocketException;
public native void close(FileDescriptor fd) throws ErrnoException;
...
这里我先将主要的几个方法列出来,后面就不用再贴代码了。
自此,bind方法就执行完了,回到之前,程序会继续执行listen方法,listen方法和bind方法的执行过程几乎完全一样,最后都是调用native层的方法,这里就不再赘述,执行完listen之后,bound = true,表示对端口成功绑定并且开启了监听。所以总的来说,在ServerSocket的构造方法中主要完成了两件事,bind和listen,完成之后,ServerSocket就会一直监听端口中客户端的连接请求,完成三次握手之后,就会建立连接并将连接放入队列中,等待应用程序调用accept从队列中取走该连接,这个过程稍后会详细介绍。
构造完ServerSocket后,应用程序会不断的调用accept来获取连接,再来看accept的源码:
/**
* Listens for a connection to be made to this socket and accepts
* it. The method blocks until a connection is made.
*/
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
从注释可以看出该方法是个阻塞的方法,直到队列中有已建立的连接。从代码可以看出,每调用一次accept就会new Socket然后,去接收连接。然后我们来看implAccept方法,其源码如下:
/**
* Subclasses of ServerSocket use this method to override accept()
* to return their own subclass of socket. So a FooServerSocket
* will typically hand this method an <i>empty</i> FooSocket. On
* return from implAccept the FooSocket will be connected to a client.
*/
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
if (s.impl == null)
s.setImpl();
else {
s.impl.reset();
}
si = s.impl;
s.impl = null;
si.address = new InetAddress();
si.fd = new FileDescriptor();
getImpl().accept(si);
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
s.impl = si;
s.postAccept();
}
Socket的setImpl方法和ServerSocket的setImpl方法实现一模一样,所以这里的变量si和参数s.impl指向的还是一个SocksSocketImpl对象,si只是临时变量,可以看到最后还是把准备好的SocksSocketImpl对象重新赋值给了s.impl;
getImpl().accept(si)是调用的SocksSocketImpl的accept方法,其源码如下:
/**
* Accepts connections.
* @param s the connection
*/
protected void accept(SocketImpl s) throws IOException {
acquireFD();
try {
BlockGuard.getThreadPolicy().onNetwork();
socketAccept(s);
} finally {
releaseFD();
}
}
首先获取文件描述符,然后调用了socketAccept(s),之后在释放文件描述符资源,socketAccept源码如下:
void socketAccept(SocketImpl s) throws IOException {
if (fd == null || !fd.valid()) {
throw new SocketException("Socket closed");
}
// poll() with a timeout of 0 means "poll for zero millis", but a Socket timeout == 0 means
// "wait forever". When timeout == 0 we pass -1 to poll.
if (timeout <= 0) {
IoBridge.poll(fd, POLLIN | POLLERR, -1);
} else {
IoBridge.poll(fd, POLLIN | POLLERR, timeout);
}
InetSocketAddress peerAddress = new InetSocketAddress();
try {
FileDescriptor newfd = Libcore.os.accept(fd, peerAddress);
s.fd.setInt$(newfd.getInt$());
s.address = peerAddress.getAddress();
s.port = peerAddress.getPort();
} catch (ErrnoException errnoException) {
if (errnoException.errno == EAGAIN) {
throw new SocketTimeoutException(errnoException);
} else if (errnoException.errno == EINVAL || errnoException.errno == EBADF) {
throw new SocketException("Socket closed");
}
errnoException.rethrowAsSocketException();
}
s.localport = IoBridge.getLocalInetSocketAddress(s.fd).getPort();
}
这里通过调用IoBridge的poll实现阻塞,其底层还是Linux系统通过监听文件描述符事件实现了阻塞,可以看一下poll的注释:
/**
* Wait for some event on a file descriptor, blocks until the event happened or timeout period
* passed. See poll(2) and @link{android.system.Os.Poll}.
*
* @throws SocketException if poll(2) fails.
* @throws SocketTimeoutException if the event has not happened before timeout period has passed.
*/
public static void poll(FileDescriptor fd, int events, int timeout)
throws SocketException, SocketTimeoutException {
StructPollfd[] pollFds = new StructPollfd[]{ new StructPollfd() };
pollFds[0].fd = fd;
pollFds[0].events = (short) events;
try {
int ret = android.system.Os.poll(pollFds, timeout);
if (ret == 0) {
throw new SocketTimeoutException("Poll timed out");
}
} catch (ErrnoException e) {
e.rethrowAsSocketException();
}
}
最终,当连接队列中有已建立的连接后,线程就会被唤醒,继续执行下面的内容,将连接的信息封装到参数s指向的对象中返回,这个对象就是前面的si指向的对象,最后将该对象赋值给前面的s.impl,接着执行s.postAccept(),其源码如下:
/**
* set the flags after an accept() call.
*/
final void postAccept() {
connected = true;
created = true;
bound = true;
}
就是标记了几个状态值,最后此s(Socket)就被返回给我们应用程序来使用了。由此ServerSocket中accept()方法就分析完毕,总结一下就是:线程会被阻塞,直到连接队列中有了新连接,线程再被唤醒,然后将连接的信息封装到一个SocketImpl对象中,再将此SocketImpl对象封装到一个Socket对象中,最后将此Socket对象返回给应用程序使用。
通过以上的分析,我们了解了服务端的bind、listen、和accept的过程,所有这些最终都是调用底层系统的方法实现的。接下来看一下客户端的分析。
3.2 客户端Socket的源码解析
Socket为指定主机地址和端口提供了6个公有的构造函数,但最终都是通过一个私有的构造函数来创建新对象。
当我们来看一下私有的构造函数源码:
private Socket(InetAddress[] addresses, int port, SocketAddress localAddr,
boolean stream) throws IOException {
if (addresses == null || addresses.length == 0) {
throw new SocketException("Impossible: empty address list");
}
for (int i = 0; i < addresses.length; i++) {
setImpl();
try {
InetSocketAddress address = new InetSocketAddress(addresses[i], port);
createImpl(stream);
if (localAddr != null) {
bind(localAddr);
}
connect(address);
break;
} catch (IOException | IllegalArgumentException | SecurityException e) {
try {
// Android-changed:
// Do not call #close, classes that extend this class may do not expect a call
// to #close coming from the superclass constructor.
impl.close();
closed = true;
} catch (IOException ce) {
e.addSuppressed(ce);
}
// Only stop on the last address.
if (i == addresses.length - 1) {
throw e;
}
}
// Discard the connection state and try again.
impl = null;
created = false;
bound = false;
closed = false;
}
}
注意此方法是AndroidSDK下java包下面的,JDK中的这个方法要比这个更简单:
private Socket(SocketAddress address, SocketAddress localAddr,
boolean stream) throws IOException {
setImpl();
// backward compatibility
if (address == null)
throw new NullPointerException();
try {
createImpl(stream);
if (localAddr != null)
bind(localAddr);
connect(address);
} catch (IOException | IllegalArgumentException | SecurityException e) {
try {
close();
} catch (IOException ce) {
e.addSuppressed(ce);
}
throw e;
}
}
过程都是差不多的都是先setImpl();然后createImpl(stream);最后connect(address);只是Android下的会做一些额外的操作,这里我们就不再赘述,直接进入connect方法里面:
/**
* Connects this socket to the server with a specified timeout value.
* A timeout of zero is interpreted as an infinite timeout. The connection
* will then block until established or an error occurs.
*/
public void connect(SocketAddress endpoint, int timeout) throws IOException {
if (endpoint == null)
throw new IllegalArgumentException("connect: The address can't be null");
if (timeout < 0)
throw new IllegalArgumentException("connect: timeout can't be negative");
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isConnected())
throw new SocketException("already connected");
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
InetAddress addr = epoint.getAddress ();
int port = epoint.getPort();
checkAddress(addr, "connect");
SecurityManager security = System.getSecurityManager();
if (security != null) {
if (epoint.isUnresolved())
security.checkConnect(epoint.getHostName(), port);
else
security.checkConnect(addr.getHostAddress(), port);
}
if (!created)
createImpl(true);
if (!oldImpl)
impl.connect(epoint, timeout);
else if (timeout == 0) {
if (epoint.isUnresolved())
impl.connect(addr.getHostName(), port);
else
impl.connect(addr, port);
} else
throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
connected = true;
/*
* If the socket was not bound before the connect, it is now because
* the kernel will have picked an ephemeral port & a local address
*/
bound = true;
}
和服务端的bind方法差不多,一样先进行一大堆的判断,最后也是调用了SocksSocketImpl类中的connect方法,看一下SocksSocketImpl中的connect源码:
/**
* Connects the Socks Socket to the specified endpoint. It will first
* connect to the SOCKS proxy and negotiate the access. If the proxy
* grants the connections, then the connect is successful and all
* further traffic will go to the "real" endpoint.
*/
@Override
protected void connect(SocketAddress endpoint, int timeout) throws IOException {
...省略
// Connects to the SOCKS server
try {
privilegedConnect(server, serverPort, remainingMillis(deadlineMillis));
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
...省略
}
这个方法代码很多也很复杂,Android版的和JDK版的还有些代码不同,这里我们不去关注,只需要知道在里面调用了 privilegedConnect(server, serverPort, remainingMillis(deadlineMillis));跟进去:
private synchronized void privilegedConnect(final String host,final int port, final int timeout) throws IOException{
try {
AccessController.doPrivileged(
new java.security.PrivilegedExceptionAction<Void>() {
public Void run() throws IOException {
superConnectServer(host, port, timeout);
cmdIn = getInputStream();
cmdOut = getOutputStream();
return null;
}
});
} catch (java.security.PrivilegedActionException pae) {
throw (IOException) pae.getException();
}
}
调用了AccessController的doPrivileged方法,在其中执行了PrivilegedExceptionAction对象的run方法,也就是执行了superConnectServer(host, port, timeout),而这个方法是直接调用父类AbstractPlainSocketImpl的connect方法,所以我们直接看AbstractPlainSocketImpl中的connect方法:
/**
* Creates a socket and connects it to the specified address on
* the specified port.
* @param address the address
* @param timeout the timeout value in milliseconds, or zero for no timeout.
* @throws IOException if connection fails
* @throws IllegalArgumentException if address is null or is a
* SocketAddress subclass not supported by this socket
* @since 1.4
*/
protected void connect(SocketAddress address, int timeout)
throws IOException {
boolean connected = false;
try {
if (address == null || !(address instanceof InetSocketAddress))
throw new IllegalArgumentException("unsupported address type");
InetSocketAddress addr = (InetSocketAddress) address;
if (addr.isUnresolved())
throw new UnknownHostException(addr.getHostName());
this.port = addr.getPort();
this.address = addr.getAddress();
connectToAddress(this.address, port, timeout);
connected = true;
} finally {
if (!connected) {
try {
close();
} catch (IOException ioe) {
/* Do nothing. If connect threw an exception then
it will be passed up the call stack */
}
}
}
}
很明显,执行了connectToAddress(this.address, port, timeout):
private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
if (address.isAnyLocalAddress()) {
doConnect(InetAddress.getLocalHost(), port, timeout);
} else {
doConnect(address, port, timeout);
}
}
判断端口是否是本地的还是远程的,然后执行相应的doConnect方法:
/**
* The workhorse of the connection operation. Tries several times to
* establish a connection to the given <host, port>. If unsuccessful,
* throws an IOException indicating what went wrong.
*/
synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpConnect(fd, address, port);
}
}
try {
acquireFD();
try {
BlockGuard.getThreadPolicy().onNetwork();
socketConnect(address, port, timeout);
/* socket may have been closed during poll/select */
synchronized (fdLock) {
if (closePending) {
throw new SocketException ("Socket closed");
}
}
// If we have a ref. to the Socket, then sets the flags
// created, bound & connected to true.
// This is normally done in Socket.connect() but some
// subclasses of Socket may call impl.connect() directly!
if (socket != null) {
socket.setBound();
socket.setConnected();
}
} finally {
releaseFD();
}
} catch (IOException e) {
close();
throw e;
}
}
在这其中执行了socketConnect(address, port, timeout);此方法在AbstractPlainSocketImpl中是个抽象方法,是由SocksSocketImpl的直接父类PlainSocketImpl实现的,来看一下其源码:
void socketConnect(InetAddress address, int port, int timeout) throws IOException {
if (fd == null || !fd.valid()) {
throw new SocketException("Socket closed");
}
IoBridge.connect(fd, address, port, timeout);
this.address = address;
this.port = port;
if (localport == 0) {
// If socket is pending close, fd becomes an AF_UNIX socket and calling
// getLocalInetSocketAddress will fail.
// http://b/34645743
if (!isClosedOrPending()) {
localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
}
}
}
调用了IoBridge的connect方法,再来看这个方法:
/**
* Connects socket 'fd' to 'inetAddress' on 'port', with a the given 'timeoutMs'.
* Use timeoutMs == 0 for a blocking connect with no timeout.
*/
public static void connect(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws SocketException, SocketTimeoutException {
try {
connectErrno(fd, inetAddress, port, timeoutMs);
} catch (ErrnoException errnoException) {
if (errnoException.errno == EHOSTUNREACH) {
throw new NoRouteToHostException("Host unreachable");
}
if (errnoException.errno == EADDRNOTAVAIL) {
throw new NoRouteToHostException("Address not available");
}
throw new ConnectException(connectDetail(fd, inetAddress, port, timeoutMs,
errnoException), errnoException);
} catch (SocketException ex) {
throw ex; // We don't want to doubly wrap these.
} catch (SocketTimeoutException ex) {
throw ex; // We don't want to doubly wrap these.
} catch (IOException ex) {
throw new SocketException(ex);
}
}
直接调用了connectErrno方法,其源码如下:
private static void connectErrno(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws ErrnoException, IOException {
// With no timeout, just call connect(2) directly.
if (timeoutMs <= 0) {
Libcore.os.connect(fd, inetAddress, port);
return;
}
// For connect with a timeout, we:
// 1. set the socket to non-blocking,
// 2. connect(2),
// 3. loop using poll(2) to decide whether we're connected, whether we should keep
// waiting, or whether we've seen a permanent failure and should give up,
// 4. set the socket back to blocking.
// 1. set the socket to non-blocking.
IoUtils.setBlocking(fd, false);
// 2. call connect(2) non-blocking.
long finishTimeNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
try {
Libcore.os.connect(fd, inetAddress, port);
IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
return; // We connected immediately.
} catch (ErrnoException errnoException) {
if (errnoException.errno != EINPROGRESS) {
throw errnoException;
}
// EINPROGRESS means we should keep trying...
}
// 3. loop using poll(2).
int remainingTimeoutMs;
do {
remainingTimeoutMs =
(int) TimeUnit.NANOSECONDS.toMillis(finishTimeNanos - System.nanoTime());
if (remainingTimeoutMs <= 0) {
throw new SocketTimeoutException(connectDetail(fd, inetAddress, port, timeoutMs,
null));
}
} while (!IoBridge.isConnected(fd, inetAddress, port, timeoutMs, remainingTimeoutMs));
IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
}
connectErrno实现了connect+poll的逻辑,可以看出,如果没有设置超时时间,则立刻调用Libcore.os.connect(fd, inetAddress, port)来建立连接,之前我们已经罗列出来了,这是个native方法,是调用系统底层来实现的。接着往下看,注释已经说明的很清楚了, 如果有timeout的时候,进行非阻塞connect,然后用poll进行事件轮询直到timeout。OK,现在再回到privilegedConnect方法中的run方法里面,当连接没报异常,表示连接成功,然后调用getInputStream()和getOutputStream()来初始化了输入流和输出流。
protected synchronized InputStream getInputStream() throws IOException {
synchronized (fdLock) {
if (isClosedOrPending())
throw new IOException("Socket Closed");
if (shut_rd)
throw new IOException("Socket input is shutdown");
if (socketInputStream == null)
socketInputStream = new SocketInputStream(this);
}
return socketInputStream;
}
protected synchronized OutputStream getOutputStream() throws IOException {
synchronized (fdLock) {
if (isClosedOrPending())
throw new IOException("Socket Closed");
if (shut_wr)
throw new IOException("Socket output is shutdown");
if (socketOutputStream == null)
socketOutputStream = new SocketOutputStream(this);
}
return socketOutputStream;
}
自此,Socket中的成员变量SocksSocketImpl中的输入和输出流就已经被初始化好了,然后就可以给应用程序使用。
最后总结一下客户端Socket:当我们指定服务器地址和端口来构造完Socket时,会通过SocksSocketImpl对象去调用系统底层与与服务端建立连接,连接建立成功后,接着就初始化SocksSocketImpl的输入流和输出流,供我们应用程序使用。
4. Linux系统中Socket对TCP的详细处理过程
对于Socket对TCP的详细处理过程,只需要一张图,你就明白了:
在前面文章中,我们分析了TCP在建立连接的三次握手,但是只分析了数据包的内容,并没有分析服务端系统底层的socket是怎么处理的,现在我们就来继续分析。
首先我们要知道:Linux内核协议栈为TCP连接管理使用两个队列,一个是SYN队列(半链接队列,用来保存处于SYN_SENT和SYN_RECV状态的请求),一个是accpetd队列(用来保存处于established状态,但是应用层没有调用accept取走的请求),这两个队列是内核实现的,当服务器绑定、监听了某个端口后,这个端口的SYN队列和ACCEPT队列就建立好了。如图,整个过程就是:
1. 当SYN包到达了服务器后,内核会把这一信息放到SYN队列(即未完成握手队列)中,同时回一个SYN+ACK包给客户端。
2. 一段时间后,客户端再次发来了针对服务器SYN包的ACK网络分组时,内核会把连接从SYN队列中取出,再把这个连接放到ACCEPT队列(即已完成握手队列)中。
3. 服务器在第3步调用accept时,其实就是直接从ACCEPT队列中取出已经建立成功的连接套接字而已。
现在我们来讨论其中可能出现的问题:
前面我们在分析java源码中讲到在创建ServerSocket的时候,需要指定一个backlog参数,默认传的是50,那么这个参数到底是什么?
在Linux内核2.2之前,backlog参数是用来限制SYN队列和ACCEPT队列的长度的,两个队列的大小都是相同的。但是在2.2以后,就分离为两个backlog来分别限制这两个队列的大小。
第一个队列的长度是/proc/sys/net/ipv4/tcp_max_syn_backlog,默认是1024。如果开启了syncookies,那么基本上没有限制。
第二个队列的长度是/proc/sys/net/core/somaxconn,默认是128,表示最多有129个established链接等待accept。
那么问题来了,当这两个队列满了后,新的请求到达了又将发生什么?
对于SYN队列,若队列满,则会直接丢弃请求,即新的SYN网络分组会被丢弃,这个问题好解决,客户端接收不到回复,会再一次发送,然后服务端继续丢弃,知道队列有空闲的位置。而客户端如果一直接收不到回复,发几次之后就会停止。
对于ACCEPT队列的处理就有点复杂了,分两种情况:
1.如果server端设置了sysctl_tcp_abort_on_overflow,那么server会发送rst给client,并删除掉这个链接。默认情况下是不会设置的。
2.如果没有设置sysctl_tcp_abort_on_overflow ,server端只是标记连接请求块的acked标志,并且连接建立定时器,会遍历半连接表,重新发送synack,重复上面的过程,如果重传次数超过synack重传的阀值,会把该连接从半连接链表中直接删除。
下面来模拟一下第二种情况:
1.Server收到ack后,尝试将连接放到accept队列,但是因为accept队列已满,所以只是标记连接为acked,并不会将连接移动到accept队列中,也不会为连接分配sendbuf和recvbuf等资源。
2.client端在发送ack之后就认为已经建立了连接,于是开始向该连接发送数据。
3.Server端收到数据包,由于acept队列仍然是满的,所以server端处理也只是标记acked,并连接建立定时器,然后返回。
4.client端由于没有收到刚才发送数据的ack,所以会重传刚才的数据包
5.同上
6.同上
7.同上
8.server端连接建立定时器生效,遍历半连接链表,发现刚才acked的连接,重新发送synack给client端。注意这个重发是有次数的,如果次数用完,服务端就直接删除了这个半连接。
9.client端收到synack后,根据ack值,使用SACK算法,只重传最后一个ack内容。
10.Server端收到数据包,如果accept队列仍然是满的,则还是只标记acked,然后返回。
11.client端等待一段时间后,认为连接不可用,于是发送FIN、ACK给server端。Client端的状态变为FIN_WAIT1,等待一段时间后,client端将看不到该链接(终止了)。
但是在上面第10步,Server端收到数据包,如果此时服务器进程处理完一个请求,从accept队列中取走一个了连接,此时accept队列中有了空闲,server端将请求的连接放到accept队列中。这样应用服务器进程显示该链接是established的,但是在client端上已经没有该链接了。之后,应用服务器进程通过该连接调用read去读取sock中的内容,但是由于client端早就退出了,所以read会一直阻塞的。或许你会认为在server端不应该建立此连接,这是内核的bug。但是内核是按照RFC来实现的,在3次握手的过程中,是不会判断FIN标志位的,只会处理SYN、ACK、RST这三种标志位。从应用层的角度来考虑解决问题的方法,那就是使用非阻塞的方式read,或者使用select超时方式read;亦或者client端关闭连接的时候使用RST方式,而不是FIN方式。
应用
经过上面分析的这些之后,我们应用层要做的就是尽量避免ACCEPT队列拥堵,所以,如TOMCAT等服务器会使用独立的线程,只做accept获取连接这一件事,以防止不能及时的去accept获取连接。但是也有的一些服务器,如Nginx,在一个线程内做accept的同时,还会做其他IO等操作,这又是为什么呢?
这里主要是由于阻塞和非阻塞的概念,应用程序可以把listen时设置的套接字设为非阻塞模式(默认为阻塞模式),这两种模式会导致accept方法有不同的行为:
1. 阻塞套接字上使用accept,第一个阶段是等待ACCEPT队列不为空的阶段,它耗时不定,由客户端是否向自己发起了TCP请求而定,可能会耗时很长。
2. 非阻塞套接字上的accept,不存在等待ACCEPT队列不为空的阶段,它要么返回成功并拿到建立好的连接,要么返回失败。因此它在accept阶段是不会阻塞的。
所以,在企业级的服务器进程中,若某一线程既使用accept获取新连接,又继续在这个连接上读、写字符流,那么,这个连接对应的套接字通常要设为非阻塞。调用accept时不会长期占用所属线程的CPU时间片,使得线程能够及时的做其他工作。