NIO编程中,除了Selector之外,Channel也很重要,本文将介绍NIO中的ServerSocketChannel;
ServerSocketChannel
ServerSocketChannel可以通过如下方式创建:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
open方法的实现如下:
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
从上面的源码可以知道,最终返回的是ServerSocketChannelImpl实例:
ServerSocketChannelImpl
构造函数:
ServerSocketChannelImpl提供了传入参数为SelectorProvider的构造函数:
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
首先调用父类ServerSocketChannel的构造函数,然后通过Net.serverSocket方法打开一个文件描述符,源码如下:
static FileDescriptor serverSocket(boolean stream) {
return IOUtil.newFD(socket0(isIPv6Available(), stream, true));
}
下面先看看socket0方法的实现,该方法传入3个参数,根据前面的代码,stream=true,那么isIPv6Available()是怎么判断的呢?
//见net_util_md.c
int fd;
void *ipv6_fn;
SOCKADDR sa;
socklen_t sa_len = sizeof(sa);
//调用socket(int family, int type, int protocol)函数实现
//参数family指定协议簇(域),常见有AF_INET(IPv4),AF_INET6(IPv6),AF_LOCAL(UNIX 协议域)等。
//可以到内核源码linux/socket.h中查看支持的协议簇有哪些
//参数type指定套接字类型:SOCK_STREAM(字节流)、SOCK_DGRAM(数据报)、SOCK_SEQPACKET(有序分组)、SOCK_RAW(原始套接字)等。
//参数protocol指定传输层协议,例如TCP、UDP;系统针对每一个协议簇与类型提供了一个默认的协议,protocol=0来表示使用这个默认的协议;linux/in.h定义了有哪些可用的传输层协议。
fd = JVM_Socket(AF_INET6, SOCK_STREAM, 0) ;
if (fd < 0) {//如果出现错误,它返回-1,并设置errno为相应的值
return JNI_FALSE;
}
//getsockname:获取与某个套接字关联的本地协议地址
if (getsockname(0, (struct sockaddr *)&sa, &sa_len) == 0) {
struct sockaddr *saP = (struct sockaddr *)&sa;
if (saP->sa_family != AF_INET6) {
return JNI_FALSE;
}
}
//检查接口是否有ipv6地址
{
// 文件/proc/net/if_inet6内容类似如下,会记录支持ipv6的设备
//fe8000000000000046a842fffe404e3a 02 40 20 80 em1
//00000000000000000000000000000001 01 80 10 80 lo
FILE *fP = fopen("/proc/net/if_inet6", "r");
char buf[255];
char *bufP;
if (fP == NULL) {
close(fd);
return JNI_FALSE;
}
//从fP读取字符串,最多为255-1个字符,如果遇到换行或文件尾,则结束
bufP = fgets(buf, sizeof(buf), fP);
fclose(fP);
if (bufP == NULL) {
close(fd);
return JNI_FALSE;
}
}
//调用dlsym,导入动态链接库中的函数或类;
//RTLD_DEFAULT表示会在当前进程中按照默认的类库加载顺序搜索"inet_pton"这个symbol
ipv6_fn = JVM_FindLibraryEntry(RTLD_DEFAULT, "inet_pton");
if (ipv6_fn == NULL ) {
close(fd);
return JNI_FALSE;
}
getaddrinfo_ptr = (getaddrinfo_f)
JVM_FindLibraryEntry(RTLD_DEFAULT, "getaddrinfo");
freeaddrinfo_ptr = (freeaddrinfo_f)
JVM_FindLibraryEntry(RTLD_DEFAULT, "freeaddrinfo");
gai_strerror_ptr = (gai_strerror_f)
JVM_FindLibraryEntry(RTLD_DEFAULT, "gai_strerror");
getnameinfo_ptr = (getnameinfo_f)
JVM_FindLibraryEntry(RTLD_DEFAULT, "getnameinfo");
if (freeaddrinfo_ptr == NULL || getnameinfo_ptr == NULL) {
/* We need all 3 of them */
getaddrinfo_ptr = NULL;
}
close(fd);
return JNI_TRUE;
只有上述检查通过,系统支持IPV6且java.net.preferIPv4Stack =false,isIPv6Available()才返回true;
socket0是通过native方式实现的,对应的c源码如下:
JNIEXPORT int JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
jboolean stream, jboolean reuse)
{
int fd;
//字节流还是数据报,TCP对应SOCK_STREAM,UDP对应SOCK_DGRAM,此处传入的stream=true;
int type = (stream ? SOCK_STREAM : SOCK_DGRAM);
#ifdef AF_INET6
int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;//IPV6还是IPV4
#else
int domain = AF_INET;//IPV4
#endif
//调用Linux的socket函数,前文已经有介绍,domain为协议域;
//type为套接字类型,protocol设置为0来表示使用默认的传输协议
fd = socket(domain, type, 0);
if (fd < 0) {//出错
return handleSocketError(env, errno);
}
#ifdef AF_INET6
if (domain == AF_INET6) {
int arg = 0;
//arg=1设置ipv6的socket只接收ipv6地址的报文,arg=0表示也可接受ipv4的请求
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
sizeof(int)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"sun.nio.ch.Net.setIntOption");
close(fd);
return -1;
}
}
#endif
//SO_REUSEADDR有四种用途:
//1.当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而你启动的程序的socket2要占用该地址和端口,你的程序就要用到该选项。
//2.SO_REUSEADDR允许同一port上启动同一服务器的多个实例(多个进程)。但每个实例绑定的IP地址是不能相同的。
//3.SO_REUSEADDR允许单个进程绑定相同的端口到多个socket上,但每个socket绑定的ip地址不同。
//4.SO_REUSEADDR允许完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP;
if (reuse) {
int arg = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"sun.nio.ch.Net.setIntOption");
close(fd);
return -1;
}
}
#if defined(__linux__) && defined(AF_INET6)
//IPV6_MULTICAST_HOPS用于控制多播的范围,1表示只在本地网络转发,更多介绍请参考(http://www.ctt.sbras.ru/cgi-bin/www/unix_help/unix-man?ip6+4);
if (domain == AF_INET6 && type == SOCK_DGRAM) {
int arg = 1;
if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"sun.nio.ch.Net.setIntOption");
close(fd);
return -1;
}
}
#endif
return fd;
}
另外额外提一句,Linux 3.9之后加入了SO_REUSEPORT选项,这个选项更强大,多个socket(不管是监听、非监听、TCP还是UDP)只要在绑定之前设置了SO_REUSEPORT选项,那么就可以绑定到完全相同的地址和端口。为了阻止"port 劫持"(Port hijacking)有一个特别的限制:所有希望共享源地址和端口的socket都必须拥有相同的有效用户id(effective user ID)。因此一个用户就不能从另一个用户那里"偷取"端口。另外,内核在处理SO_REUSEPORT socket的时候使用了其它系统上没有用到的"特别魔法":
- 对于UDP socket,内核尝试平均的转发数据报;
- 对于TCP监听socket,内核尝试将新的客户连接请求(由accept返回)平均的交给共享同一地址和端口的socket(监听socket)。
这意味着在其他系统上socket收到一个数据报或连接请求或多或少是随机的,但是linux尝试优化分配。
例如:一个简单的服务器程序的多个实例可以使用SO_REUSEPORT socket实现一个简单的负载均衡,因为内核已经把请求的分配都做了。
当socket创建成功之后,调用IOUtil.newFD创建文件描述符:
//IOUtil.java
static FileDescriptor newFD(int i) {
FileDescriptor fd = new FileDescriptor();
setfdVal(fd, i);
return fd;
}
文件描述符简称fd,它是一个抽象概念,在很多其它体系下,它可能有其它名字,比如在C库编程中可以叫做文件流或文件流指针,在其它语言中也可以叫做文件句柄(handler),而且这些不同名词的隐含意义可能是不完全相同的。不过在系统层,还是应该使用系统调用中规定的名词,我们统一把它叫做文件描述符。
文件描述符本质上是一个数组下标(C语言数组)。在内核中,这个数组是用来管理一个进程打开的文件的对应关系的数组。就是说,对于任何一个进程来说,都有这样一个数组来管理它打开的文件,数组中的每一个元素和文件是映射关系,即:一个数组元素只能映射一个文件,而一个文件可以被多个数组元素所映射。
其实上面的描述并不完全准确,在内核中,文件描述符的数组所直接映射的实际上是文件表,文件表再索引到相关文件的v_node。具体可以参见《UNIX系统高级编程》。
Linux在产生一个新进程后,新进程的前三个文件描述符都默认指向三个相关文件。这三个文件描述符对应的数组下标分别为0,1,2。0对应的文件叫做标准输入(stdin),1对应的文件叫做标准输出(stdout),2对应的文件叫做标准报错(stderr)。但是实际上,默认跟人交互的输入是键盘、鼠标,输出是显示器屏幕,这些硬件设备对于程序来说都是不认识的,所以操作系统借用了原来“终端”的概念,将键盘鼠标显示器都表现成一个终端文件。于是stdin、stdout和stderr就最重都指向了这所谓的终端文件上。于是,从键盘输入的内容,进程可以从标准输入的0号文件描述符读取,正常的输出内容从1号描述符写出,报错信息被定义为从2号描述符写出。这就是标准输入、标准输出和标准报错对应的描述符编号是0、1、2的原因。这也是为什么shell对报错进行重定向要使用2>的原因(其实1>也是可以用的)。
//FileDescriptor.java
private int fd;
public static final FileDescriptor in = new FileDescriptor(0);
public static final FileDescriptor out = new FileDescriptor(1);
public static final FileDescriptor err = new FileDescriptor(2);
为什么是0、1和2,在上面已经介绍了;由于FileDescriptor没有提供外部设置fd的方法,setfdVal是通过JNI方式实现的:
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val)
{
(*env)->SetIntField(env, fdo, fd_fdID, val);
}
bind
ServerSocketChannel通过bind方法将服务绑定到地址:
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (lock) {//加锁
if (!isOpen())
throw new ClosedChannelException();
if (isBound())//是否已经调用过bind,通过localAddress判断
throw new AlreadyBoundException();
//InetSocketAddress(0)表示绑定到本机的所有地址,由操作系统选择合适的端口
InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
Net.checkAddress(local);
//检查端口是否允许使用
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
//判断是否启用了SDP协议,如果是,需要进行协议转换
//关于SDP,请参考https://docs.oracle.com/javase/tutorial/sdp/sockets/overview.html
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
localAddress = Net.localAddress(fd);
}
}
return this;
}
下面分别看看Net中的bind和listen方法是如何实现的:
Net.bind
static void bind(FileDescriptor fd, InetAddress addr, int port)
throws IOException
{
bind(UNSPEC, fd, addr, port);
}
static void bind(ProtocolFamily family, FileDescriptor fd,
InetAddress addr, int port) throws IOException
{
//如果传入的协议域不是IPV4而且支持IPV6,则使用ipv6
boolean preferIPv6 = isIPv6Available() &&
(family != StandardProtocolFamily.INET);
bind0(preferIPv6, fd, addr, port);
}
bind0为native方法实现:
JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6,
jboolean useExclBind, jobject iao, int port)
{
SOCKADDR sa;
int sa_len = SOCKADDR_LEN;
int rv = 0;
//将java的InetAddress转换为c的struct sockaddr
if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *)&sa, &sa_len, preferIPv6) != 0) {
return;//转换失败,方法返回
}
//调用bind方法:int bind(int sockfd, struct sockaddr* addr, socklen_t addrlen)
//套接字是用户程序与内核交互信息的枢纽,它自身没有网络协议地址和端口号等信息,在进行网络通信的时候,必须把一个套接字与一个地址相关联。
//很多时候内核会我们自动绑定一个地址,然而有时用户可能需要自己来完成这个绑定的过程,以满足实际应用的需要;
//最典型的情况是一个服务器进程需要绑定一个众所周知的地址或端口以等待客户来连接。
//对于客户端,很多时候并不需要调用bind方法,而是由内核自动绑定;
//性能测试的时候为了在一台机器上发起海量连接端口的限制,会在一个机器上配置多个ip地址,建立连接时,绑定ip;
rv = NET_Bind(fdval(env, fdo), (struct sockaddr *)&sa, sa_len);
if (rv != 0) {
handleSocketError(env, errno);
}
}
Net.listen
Net.listen是native方法,源码如下:
JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog)
{
if (listen(fdval(env, fdo), backlog) < 0)
handleSocketError(env, errno);
}
可以看到底层是调用listen实现的,listen函数在一般在调用bind之后-调用accept之前调用,它的函数原型是:
int listen(int sockfd, int backlog)返回:0──成功, -1──失败
accept
accept方法接受客户端连接,实现如下:
public SocketChannel accept() throws IOException {
synchronized (lock) {//加锁
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
try {
//由于方法加锁,为了支持中断,在当前线程上注册Interruptible,
//当Thread.interrupt时,会调用Interruptible,关闭channnel
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
//调用accetp接收套接字中已建立的连接
//函数原型:int accept(int sockfd,struct sockaddr *addr, socklen_t *addrlen);
//如果fd监听套结字的队列中没有等待的连接,套接字也没有被标记为Non-blocking,accept()会阻塞直到连接出现;
//如果套接字被标记为Non-blocking,队列中也没有等待的连接,accept()返回错误EAGAIN或EWOULDBLOCK
n = accept0(this.fd, newfd, isaa);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
//设为堵塞模式,如果要使用非堵塞,用户需要手工调用configureBlocking(false)方法
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];//远程连接地址
sc = new SocketChannelImpl(provider(), newfd, isa);
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;//返回SocketChannelImpl
}
}
configureBlocking
configureBlocking方法最终调用IOUtil.c文件的configureBlocking函数:
static int
configureBlocking(int fd, jboolean blocking)
{
//获取文件描述符的flags
int flags = fcntl(fd, F_GETFL);
int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
//如果flag无变化,返回0,否则调用fcntl设置新flags
return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}
fcntl函数根据文件描述符来操作文件的特性:
用法:
int fcntl(int fd, int cmd);
int fcntl(int fd, int cmd, long arg);参数:
- fd:文件描述符;
- cmd:操作命令;
- arg:供命令使用的参数。
常用操作命令如下:
- F_GETFL :读取文件状态标志;
- F_SETFL :设置文件状态标志,其中O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NOCTTY 和 O_TRUNC不受影响,可以更改的标志有 O_APPEND,O_ASYNC, O_DIRECT, O_NOATIME 和 O_NONBLOCK。