stream 理解
分享下本人对stream(流)
的理解:假设把数据理解为水,底层通过系统调用读取数据或者写入数据的过程就是水龙头在流水,流出来的水覆水难收,对于已经读过的数据不需要在乎它的生死因为这些水不可以重新流出来,我们只关注还没有被读的数据,在 Java 层对未流出来的水做缓存,为的是让水龙头的水更有效率地流出来甚至可以重新从读取已读过的数据(这里已经是 Java 层不能再用水来比喻了,底层虽然没有重新读数据的操作,但是到了 Java 层我们可以按需求实现).
至于不缓存为什么慢是因为底层 stream 是需要系统调用占用 CPU 资源,假设我们每次读的数据量小但读的次数非常多,每次都需要调用系统方法读取数据岂不是浪费 CPU 资源,如果把数据缓存到内存中,缓存一次的数据可提供多次的读操作.但 CPU 和内存之间要做好平衡,毕竟两个都是珍贵资源.
其实 BufferedInputStream 读取数据并不一定非要从缓存中获取,在缓存效率不高的情况下(见下文)是可以直接从底层流中通过系统调用获取.
类声明
public class BufferedInputStream extends FilterInputStream
成员变量
// 默认 buffer 底层字节数组容量
private static int DEFAULT_BUFFER_SIZE = 8192;
// buffer 底层字节数组最大容量
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
// buffer 底层字节数组
protected volatile byte buf[];
/* 原子更新器,保证了数组的原子性,防止在 buffer 被关闭的情况下修改 buffer 数组.
判断 buffer 是否被关闭的条件是 buf 数组是否为 null */
private static final
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater
(BufferedInputStream.class, byte[].class, "buf");
// buffer 有效数据最后一个字节的下一个下标
protected int count;
// 下一个 buffer 有效数据的字节下标
protected int pos;
/* 标记模式下第一个有效数据下标,标记模式开启的标志是 markpos >= 0
且可读的有效数据长度为 marklimit
标记模式下 [markpos , pos) 内数据都必须保留(即使这段数据会被移动到 buffer 数组别的位置)
这段数据不可以被抛弃除非 pos - markpos > marklimit */
protected int markpos = -1;
// 标记模式下可读的数据长度
protected int marklimit;
- pos count markpos 都是 buf[] 的下标.
-
有效数据
是由 buf[] 内部分且连续的数组空间组成, buf[] 内在空间上分为两个部分:可读空间和可写空间-
可读空间
指的是数组下标在区间 [pos , count) 内所占空间. -
可写空间
指的是数组下标在区间 [0, pos) [count, buf.length) 内所占空间,在往缓存数组中添加数据的时候如果还有可写空间就会直接把数据添加到可写空间中,必要时会把可读空间中的数据复制到数组头部,把所有可写空间留到尾部继续添加数据.
-
- BufferedInputStream 在流关闭的时候会把缓存数据 buf[] 置为 null , 具体实现在最后的 close() 方法内.
构造函数
// BufferedInputStream.java
public BufferedInputStream(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
// FilterInputStream.java
protected volatile InputStream in;
protected FilterInputStream(InputStream in) {
this.in = in;
}
构造函数主要做两件事情:
- 创建新的 buffer 底层字节数组 buf ,大小默认为 DEFAULT_BUFFER_SIZE (8kb) ,可自定义设置大小.
- 保存底层的 InputStream 对象, buf 缓存的数据都是从这个 InputStream 中获取的.
- BufferedInputStream 可以理解为底层 InputStream 的包装类,读取数据的时候先从底层 InputStream 读取数据并缓存到内存,然后从缓存中读取数据,所以 BufferedInputStream 只提供缓存功能,真正的获取数据操作是它包裹的 InputStream.
mark 机制
上面我说过在底层中读取过的数据是泼出去的水覆水难收,而在 Java层 BufferedInputStream 提供了 mark 机制收回部分泼出去的水, mark 机制通过调用 mark() 在 inputstream 标记某个位置 makrpos ,然后通过 reset() 可以重新从 markpos 位置上读取数据.其实现方法就是在 buffer 添加缓存数据 fill() 的时候保存标记位置到当前位置 [markpos, pos] 内的数据,然后往后面添加数据,但为了保证数据的连贯性,只有在 buffer 中缓存的数据全部都是已读的情况下才可以调用 fill() .
关于 mark 机制的介绍在源码 makrpos 变量注释中:
/**
* The value of the <code>pos</code> field at the time the last
* <code>mark</code> method was called.
* <p>
* This value is always
* in the range <code>-1</code> through <code>pos</code>.
* If there is no marked position in the input
* stream, this field is <code>-1</code>. If
* there is a marked position in the input
* stream, then <code>buf[markpos]</code>
* is the first byte to be supplied as input
* after a <code>reset</code> operation. If
* <code>markpos</code> is not <code>-1</code>,
* then all bytes from positions <code>buf[markpos]</code>
* through <code>buf[pos-1]</code> must remain
* in the buffer array (though they may be
* moved to another place in the buffer array,
* with suitable adjustments to the values
* of <code>count</code>, <code>pos</code>,
* and <code>markpos</code>); they may not
* be discarded unless and until the difference
* between <code>pos</code> and <code>markpos</code>
* exceeds <code>marklimit</code>.
*
* @see java.io.BufferedInputStream#mark(int)
* @see java.io.BufferedInputStream#pos
*/
protected int markpos = -1;
markpos 默认值是 -1,当 mark() 被调用后 markpos 就是重新读取的第一个字节的下标,如果 markpos 不是 -1 数据范围 [markpos, pos -1] 内数据即使会被移动到数组中移动也要被完整保存,同时要维护 count pos markpos 的值, 只有当 pos - markpos >= marklimit 的时候 mark 机制才算无效.
先了解两个关键方法:
getInIfOpen()
// BufferedInputStream.java
private InputStream getInIfOpen() throws IOException {
InputStream input = in;
if (input == null)
throw new IOException("Stream closed");
return input;
}
获取底层 InputStream 的对象,如果流被关闭就抛异常.该方法除了获取底层流还可以判断流是否关闭,通过抛异常终端后续操作.
getBufIfOpen()
// BufferedInputStream.java
private byte[] getBufIfOpen() throws IOException {
byte[] buffer = buf;
if (buffer == null)
throw new IOException("Stream closed");
return buffer;
}
获取 buffer 数组对象引用,如果流被关闭就抛异常.所以该方法除了获取底层流还可以判断流是否关闭,通过抛异常终端后续操作.
如果流被关闭 buf 数组会置为 null.
上面我们提到了 mark 机制,那么 BufferedInputStream 是如何执行这个机制的呢,其实核心逻辑在 fill()
中,我们可以从 read() 阅读为引子深入了解 mark 机制的实现.
read()
// BufferedInputStream.java
/**
* 返回下一个字节
*/
public synchronized int read() throws IOException {
if (pos >= count) {
// pos >= count 表示当前 buffer 内的所有数据都读完了
// 调用 fill() 往 buffer 内添加新的数据
fill();
// 如果 pos >= count 依然成立,返回 -1 表示数据已读完
if (pos >= count)
return -1;
}
// 返回 buffer 有效数据的第一个字节
return getBufIfOpen()[pos++] & 0xff;
}
大家可能发现最后返回之前还要做一次位运算,原因是 buf[] 是 byte 数组,read() 返回的是 int ,如何把一个 byte 变成一个 int 呢,答案就是补零,把 8 位的 byte 变成 32位的 int.
fill()
#1注意 fill() 认为当前 buffer 内所有数据都已经被读过了,即 pos > count
/**
* 添加数据到 buffer 中
* 该方法执行 buffer 数据已经全被阅读,即 pos > count.
*/
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0;
else if (pos >= buffer.length)
if (markpos > 0) {
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1;
pos = 0;
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
} else {
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
// 使用底层 InputStream 读取数据保存到 buffer 中
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
fill() 调用时机是在当前缓存中没有可以读取的数据来刷新缓存数据,所以 pos 肯定大于 count,了解这个前提阅读上面代码更加容易.
fill() 逻辑看似复杂,里面有很多 else if , 但其实可以把里面主要分两种情况,下面用伪代码简化 fill() 的逻辑.
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0){
// (1) markpos 无效
pos = 0;
}else{
// (2) markpos 有效
}
// 两种情况都会重新设置 pos,第二种情况可能会保存部分数据,
// 所以数组范围[0,pos]是只读不可以写的
count = pos;
// 从底层 inputstream 读取数据到 buffer 数组中(从 buffer[pos] 开始写),
// 直到写满 buffer 数组或者没有数据为止.
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
// 读取数据后累加 count
count = n + pos;
}
fill() 中主要分两种情况,分别是 markpos 有效和 markpos 无效的情况.无论是哪种情况都会设置 pos 作为数组可写的第一个字节下标,然后从构造方法中保存的底层 inputstream 中读取数据,直到写满数组或者没有数据为止.
markpos 无效时很简单,markpos 无效的时候整个数组内的数据都可以丢弃,直接从下标 0 开始覆盖数据就可以.
markpos 有效时稍微复杂点,下面放出这段逻辑的代码,为了方便理解我帮这段代码加了两对大括号和最后加了一个else:
if (markpos < 0){
// (1)
// markpos 无效
pos = 0;
}else if (pos >= buffer.length){ /* buffer 已满 */
// (2)
if (markpos > 0) {
// (2.1)
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
// 可读范围从这段数据的后的下一个字节开始
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
// (2.2.1)
markpos = -1;
pos = 0;
} else if (buffer.length >= MAX_BUFFER_SIZE) {
// (2.2.2)
throw new OutOfMemoryError("Required array size too large");
} else {
// (2.2.3)
// pos <= MAX_BUFFER_SIZE - pos 相当于判断 pos 大小是否大于 MAX_BUFFER_SIZE 的一半
// nsz 为 buffer 容量扩充后的大小
// pos >= MAX_BUFFER_SIZE/2 时 nsz = MAX_BUFFER_SIZE
// pos < MAX_BUFFER_SIZE/2 时 nsz = pos * 2
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
// nsz 大小不可以超过 marklimit
nsz = marklimit;
// 创建大小为 nsz 新的 buffer 数组
byte nbuf[] = new byte[nsz];
// 把旧数组中的数据复制到新的数组中
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// 在高并发场景下,调用 compareAndSet() 可保证流还没有被关闭
throw new IOException("Stream closed");
}
// 把 buffer 底层字节数组换成新的扩容后的数组
buffer = nbuf;
}
}else{
// (3)
// 源码没有这个 else ,因为这里没有任何逻辑
}
为了方便理解我在上面的各个逻辑分支代码块做了标记,他们的层级关系和含义如下:
- (1) markpos 无效
- (2) markpos 有效 ( 数组已满 )
- (2.1) markpos > 0
- (2.2) markpos = 0
- (2.2.1) 数组长度 >= poslimit
- (2.2.2) 数组长度大于规定容量最大值 MAX_BUFFER_SIZE
- (2.2.3) 其余情况
- (3) markpos 有效 ( 数组未满 )
(2.1)
如果 markpos >0 的话,根据 mark 机制要保存 [markpos, pos) 范围内的数据,所以就把这部分数据复制到数组的 [0, pos-markpos] ,然后设置 pos 指向在这段数据的下一个字节,并且把 markpos 置零.
(2.2.1)
如果数组长度大于 marklimit ,根据 mark 机制 markpos 就是无效的了,所以重置 markpos 为 -1 且不需要保留任何数据, pos 置零.
(2.2.2)
如果数组长度大于规定的缓存大小 MAX_BUFFER_SIZE 就抛异常.
(2.2.3)
这里其实同时满足了 markpos有效 && markpos = 0 && 数组长度 < poslimit && 数组已满 这几个条件,根据 mark 机制 markpos 依然有效,所以 [markpos, pos] 即整个数组内的数据都是要保留的,所以只能先增加 buffer 字节数组的容量才能添加更多的数据.
实现逻辑就是计算新的 buffer 大小,然后创建新的数组,把旧数组中的所有数据都复制到新的数组中.
注意最终的扩容大小只能 <= marklimit ,所以调用 mark(int limit) 时传的 int 值过大会导致 (2.2.2) 抛异常.
(3)
并没有任何逻辑,因为数组不满的时候,直接添加数据即可.
简单例子
File file = new File("read.text");
FileInputStream in = new FileInputStream(file);
BufferedInputStream inBuffer = new BufferedInputStream(in);
BufferedOutputStream outBuffer = new BufferedOutputStream(System.out);
int len = 0;
byte[] bs = new byte[1024];
inBuffer.read(bs);
while ((len = inBuffer.read(bs)) != -1) {
// 每次 while 循环都从 inputstream 读取数据放在 bs 中,然后用 outputstream 输出 bs 内的数据
out.write(bs, 0, len);
}
inBuffer.close();
outBuffer.close();
这是一个经典的阅读文件并输出的例子.下面从 read() 开始阅读源码.
read(byte b[])
// FilterInputStream.java
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
read(byte b[]) 是 BufferedInputStream 父类 FilterInputStream 的方法,而 BufferedInputStream 并没有重写它,所以这里调用的是父类方法.
read(byte b[], int off, int len)
// BufferedInputStream.java
/**
* @param b 读取数据存放的目标数组
* @param off 偏移值,表示 b 数组可以写的第一个字节的下标
* @param len 读取的数据大小
*/
public synchronized int read(byte b[], int off, int len)
throws IOException
{
// 检查流是否被关闭
getBufIfOpen();
// 对 read() 传入参数做校验,防止数组边界越界
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
// n 记录已读数据大小
int n = 0;
for (;;) {
// 调用 read1 方法往数组 b 中添加数据, nread 为添加的数据大小
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
// 满足 nread > 0 && n >= len
// 判断数据是否已经读完了
if (n >= len)
return n;
InputStream input = in;
// 满足 nread > 0 && n < len
// 判断 inputstream 中是否还有未读数据
if (input != null && input.available() <= 0)
return n;
}
}
read(byte b[], int off, int len) 逻辑很简单,主要是通过 read1() 读取数据并写入数组 b 中,但是由于 read1() 可能是从缓存 buffer 或者直接从底层 InputStream 中读取,读取的数据大小不一定是 len,所以会循环调用 read1 读取知道没有数据或者读取的数据大小达到 len 为止.
read1(byte b[], int off, int len)
// BufferedInputStream.java
/**
* @param b 读取数据存放的目标数组
* @param off 偏移值,表示 b 数组可以写的第一个字节的下标
* @param len 读取的数据大小
*/
private int read1(byte[] b, int off, int len) throws IOException {
// avail 表示缓存数组 buf[] 可写空间大小
int avail = count - pos;
// avail <= 0 表示缓存数组已写满
if (avail <= 0) {
/* If the requested length is at least as large as the buffer, and
if there is no mark/reset activity, do not bother to copy the
bytes into the local buffer. In this way buffered streams will
cascade harmlessly. */
/* 如果读取的大小 >= 缓存 buffer 数组长度,且 mark 机制是无效的时候,
不用纠结还想着先把数据复制到缓存中,直接从 InputStream 中读取数据即可.*/
if (len >= getBufIfOpen().length && markpos < 0) {
// 内部调用了 read()
return getInIfOpen().read(b, off, len);
}
// 往缓存中添加数据
fill();
// 如果当前
avail = count - pos;
if (avail <= 0) return -1;
}
// cnt 表示需要复制的数据长度
int cnt = (avail < len) ? avail : len;
// 从 buffer 中获取数据范围 [pos, pos+cnt] 数据到 b 中范围 [off, off+cnt] 中
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
// 更新缓存中下一个可读字节下标 pos
pos += cnt;
return cnt;
}
1.首先计算可读数据大小 avail .
2.如果缓存中还有可读数据的话,就从缓存中读取进入步骤 3.
- 但如果缓存中已经没有可读数据了,就需要调用 fill() 添加新的数据到缓存中再进行步骤 3 ,但是这里有个问题就是如果我们所需阅读的数据大小 len 比缓存数组大小 buf[].length 还大,即至少要刷新缓存数据两次才满足,那么缓存的作用就大大减小的.毕竟缓存是为了更快的获取数据,
缓存效率
应该是与缓存数据两次刷新时间之间执行的read次数
成正比,现在是一次读数据的操作至少需要刷新缓存两次的囧境,所以直接从底层 InputStream 获取数据即可,步骤3也不需要执行了.
3.确定本次读取的数据大小 cnt 然后复制数据到数组 b 中,如果 avail < len 就是即使把缓存中的可读数据读完还不满足需要,也没所谓因为 read1 不需要一次性就把所有数据读完,像 read(byte b[], int off, int len) 上面说的循环调用 read1 直到读取数据量达到需要即可.
close()
public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
// 通过 bufUpdater.compareAndSet 把 buffer 置为 null
// 置空成功返回 true
if (bufUpdater.compareAndSet(this, buffer, null)) {
// 本例中 in 是 FileInputStream
InputStream input = in;
in = null;
if (input != null)
// 关闭流
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}
close 操作很简单,就是置空缓存数组和关闭流.
bufUpdater 保证了 buffer 操作的原子性,本文不做详细介绍.