JDK的io库由于历史原因设计的比较复杂,有很多装饰类,使用起来需要记忆大量的类,相信你也对此早已诟病不满。Square公司推出的Okio应运而生,它原本是作为Okhttp的io功能库而设计的,也是因为Okhttp而被大家熟知。从知道到会使用,再到理解实现原理后熟练使用,甚至在此基础上二次开发优化,这个认知的过程需要刻意练习,这篇文章就是对Okio的一个总结,Okio虽然代码量不是很多, 但是里面值得学习的地方还是很多。
Source + Sink
简介
Okio定义了自己的一套继承链,Source对应InputStream, Sink对应OutputStream,这样对比就不难理解了,看一下接口的定义
public interface Source extends Closeable {
long read(Buffer sink, long byteCount) throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
public interface Sink extends Closeable, Flushable {
void write(Buffer source, long byteCount) throws IOException;
@Override void flush() throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
接口定义的方法很简洁,
read/write方法,读取和写入数据的接口方法,它们的第一个参数都是Buffer,关于这个类后面会详细介绍,这里我们暂且按照缓冲区理解它。byteCount就是读取或者写入的字节数。
timeout方法,Okio新增的新特性,超时控制
close方法,关闭输入输出流
flush方法,将Buffer缓冲区中的数据写入目标流中。
如何使用
Okio已经帮我们定义了一个门面类,名字就叫Okio,通过它可以生成各种我们需要的对象。
比如Okio.source(inputStream); 将inputStream包装成我们的Source对象,同样的
Okio.sink(outputStream);将outputStream包装成Sink对象。
所以Okio的底层操作的流对象还是Jdk里面定义的InputStream和OutputStream,作为一个轻量级的io框架它不可能跳出Jdk的框架去另外实现一套,它做的只是方便开发者的封装,但是它的封装设计足够优秀,这也是我还在这里跟你们吹牛x的原因,还是不想从大神那里学个一招半式。
看个例子
File file = new File("/Users/aliouswang/Documents/olympic/JavaArt//text.temp");
Sink sink = Okio.sink(file);
Buffer buffer = new Buffer();
buffer.writeString("Hello okio!", Charset.forName("UTF-8"));
buffer.writeInt(998);
buffer.writeByte(1);
buffer.writeLong(System.currentTimeMillis());
buffer.writeUtf8("Hello end!");
sink.write(buffer, buffer.size());
sink.flush();
sink.close();
很简单的一个写文件的例子,前面说过Source和Sink的读和写的方法都需要一个Buffer对象,Buffer对象帮我们提供了类似BufferedInputStream和BufferedOutputStream的缓冲区功能(提高读写效率),同时还提供了DataInputStream和DataOutputStream中的大部分功能(比如写int,byte,long等),而且Buffer还提供了写String的方法,更是为我们经常使用的UTF-8编码格式,单独提供读写方法。
有写就有读
Source source = Okio.source(file);
buffer.clear();
source.read(buffer, 1024);
String string = buffer.readString("Hello okio!".length(), Charset.forName("UTF-8"));
int intValue = buffer.readInt();
byte byteValue = buffer.readByte();
long longValue = buffer.readLong();
String utf8 = buffer.readUtf8();
System.out.println("str:" + string + ";\nint:" + intValue + ";\nbyte:" + byteValue + ";" +
"\nlong:" + longValue + "\nutf8:" + utf8);
source.close();
// 打印结果:
str:Hello okio!;
int:998;
byte:1;
long:1555325659665
utf8:Hello end!
但是每次都去new一个Buffer对象,是不是很麻烦,你我都能想到的,大神们肯定早就想到了,于是乎有了BufferedSink,BufferedSource。
BufferedSource + BufferedSink
BufferedSource 和 BufferedSink 也都是接口,里面定义的接口方法比较多,篇幅关系,这里只列出BufferedSink的定义,更细节的可以查看源码,源码中对很多方法的注释都举了例子来帮助我们理解,Okio的作者也是用心良苦,生怕我们广大的码农们看不懂,不会用啊!!!
public interface BufferedSink extends Sink, WritableByteChannel {
Buffer buffer();
BufferedSink write(ByteString byteString) throws IOException;
BufferedSink write(byte[] source) throws IOException;
BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
long writeAll(Source source) throws IOException;
BufferedSink write(Source source, long byteCount) throws IOException;
BufferedSink writeUtf8(String string) throws IOException;
BufferedSink writeString(String string, Charset charset) throws IOException;
BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
throws IOException;
BufferedSink writeByte(int b) throws IOException;
BufferedSink writeShort(int s) throws IOException;
BufferedSink writeShortLe(int s) throws IOException;
BufferedSink writeInt(int i) throws IOException;
BufferedSink writeIntLe(int i) throws IOException;
BufferedSink writeLong(long v) throws IOException;
BufferedSink writeLongLe(long v) throws IOException;
BufferedSink writeDecimalLong(long v) throws IOException;
BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
@Override void flush() throws IOException;
BufferedSink emit() throws IOException;
BufferedSink emitCompleteSegments() throws IOException;
OutputStream outputStream();
}
可以看到BufferedSink继承于Sink,同时还继承了WritableByteChannel,这个接口是nio接口,所以Okio同样实现了nio的相关功能,这里由于水平有限,关于nio的知识这篇文章不会涉及,有兴趣的同学可以自行查阅资料哦。
BufferedSink定义了Buffer类中定义的全部方法,同时还定义了一个buffer()方法,返回一个Buffer对象,我们大概可以猜想到,这里应该是一个不太标准的代理模式,BufferedSink委托Buffer来干活。
Okio同样提供了Buffer相关的方法方便我们使用。
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
返回的是BufferedSink 和 BufferedSource,Okio的默认实现类是RealBufferedSink和RealBufferedSource,我们可以通过BufferedSource和BufferedSink对上面读写文件的例子进行修改,
File file = new File("/Users/aliouswang/Documents/java/JavaArt/text.temp");
Sink sink = Okio.sink(file);
BufferedSink bufferedSink = Okio.buffer(sink);
bufferedSink.writeString("Hello okio!", Charset.forName("UTF-8"));
bufferedSink.writeInt(998);
bufferedSink.writeByte(1);
bufferedSink.writeLong(System.currentTimeMillis());
bufferedSink.writeUtf8("Hello end!");
bufferedSink.close();
Source source = Okio.source(file);
BufferedSource bufferedSource = Okio.buffer(source);
String string = bufferedSource.readString("Hello okio!".length(), Charset.forName("UTF-8"));
int intValue = bufferedSource.readInt();
byte byteValue = bufferedSource.readByte();
long longValue = bufferedSource.readLong();
String utf8 = bufferedSource.readUtf8();
System.out.println("str:" + string + ";\nint:" + intValue + ";\nbyte:" + byteValue + ";" +
"\nlong:" + longValue + "\nutf8:" + utf8);
source.close();
可以看到,BufferedSource和BufferedSink能够满足我们对io的日常绝大部分使用场景。
Okio门面类的实现
更一般的,我们会这样去写,链式调用,代码更简洁。
BufferedSource bufferedSource = Okio.buffer(Okio.source(file));
BufferedSink bufferedSink = Okio.buffer(Okio.sink(file));
非常简洁的就能生成BufferedSource和BufferedSink,看一下Okio帮我们做了什么。
public static Source source(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return source(new FileInputStream(file));
}
public static Source source(InputStream in) {
// 生成一个默认的Timeout超时对象,默认实现是没有超时deadtime的
return source(in, new Timeout());
}
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
// 检查超时,
timeout.throwIfReached();
// 从Buffer获取一个可以写入的Segment,这一块只是接下来再具体分析
Segment tail = sink.writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
// 将最大能copy的字节写入Buffer,
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
@Override public void close() throws IOException {
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
通过Okio.source的实现可以看到,在读取的时候,会从传入的InputStream in 对象中读取字节到Buffer sink中,前面我们提到过,RealBufferedSource和RealBufferedSink内部都持有一个Buffer对象,可以猜测,它们持有的buffer对象 会在读写的时候传入。我们进入源码验证一下, 这里我们以readString 方法为例。
@Override public String readString(long byteCount, Charset charset) throws IOException {
require(byteCount);
if (charset == null) throw new IllegalArgumentException("charset == null");
return buffer.readString(byteCount, charset);
}
@Override public void require(long byteCount) throws IOException {
if (!request(byteCount)) throw new EOFException();
}
@Override public boolean request(long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
while (buffer.size < byteCount) {
if (source.read(buffer, Segment.SIZE) == -1) return false;
}
return true;
}
@Override public String readString(long byteCount, Charset charset) throws EOFException {
checkOffsetAndCount(size, 0, byteCount);
if (charset == null) throw new IllegalArgumentException("charset == null");
if (byteCount > Integer.MAX_VALUE) {
throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
}
if (byteCount == 0) return "";
Segment s = head;
if (s.pos + byteCount > s.limit) {
// If the string spans multiple segments, delegate to readBytes().
return new String(readByteArray(byteCount), charset);
}
String result = new String(s.data, s.pos, (int) byteCount, charset);
s.pos += byteCount;
size -= byteCount;
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return result;
}
代码比较清晰,先从source中读取要求的bytecount长度的String到buffer中,然后从buffer中读取String 返回。其他的读取方法跟readString大同小异,有兴趣同学可以自行查阅源码。
说了这么久,我们的主角Buffer对象登场了。
Buffer
看一下Buffer类的申明,实现了BufferedSource, BufferedSink, Cloneable, ByteChannel 四个接口。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel {...}
我们知道Buffer作为缓冲区,肯定底层需要有数据结构来存储暂存的数据,JDK的BuffedInputStream和BufferedOutputStream中是使用字节数组的,而这里Okio的Buffer不是,它使用的是Segment。
public Segment head;
Segment
Segment 是一个双向循环链表,它的内部持有一个byte[] data,默认大小8192(与JDK的BufferedInputStream相同)。
public final class Segment {
/** The size of all segments in bytes. */
static final int SIZE = 8192;
/** 默认共享最小字节数*/
static final int SHARE_MINIMUM = 1024;
final byte[] data;
/** 标识下一个读取字节的位置 */
int pos;
/** 标识下一个写入字节的位置 */
int limit;
/** 是否与其他Segment共享byte[] */
boolean shared;
/** 是否拥有这个byte[], 如果拥有可以写入 */
boolean owner;
/** Segment后继 */
public Segment next;
/** Segment前驱 */
Segment prev;
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
......
}
Sement关键的成员变量都加了注释,Okio为了优化性能,避免频繁的创建和回收对象,使用了对象池模式,设计了SegmentPool类来管理Segment。
SegemntPool
final class SegmentPool {
/** The maximum number of bytes to pool. */
// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
/** Singly-linked list of segments. */
static @Nullable Segment next;
/** Total bytes in this pool. */
static long byteCount;
private SegmentPool() {
}
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
}
SegmentPool代码很简洁,它的最大容量是8个Segment,如果超过调用take方法就会直接新建一个Segment对象,另外recycle回收方法负责回收闲置的Segment,将其加入链表,供其他buffer使用。
有了Segment和SegmentPool的知识,就更容易理解Buffer类的实现了。
比如Okio.source方法新建的Source对象的read方法,获取可以写入的Segment对象,便利Segment链表获取可以写入的Segment,如果head为null则新建一个Segment。
// 从Buffer获取一个可以写入的Segment,这一块只是接下来再具体分析
Segment tail = sink.writableSegment(1);
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
Buffer的其他方法就不一一分析了,有了我们前面的知识,相信看起来不会太难。接下来我们看一下Okio的另一个类ByteString。
ByteString
我们知道String是的内部是基于char[] 数组来实现的,Okio的ByteString内部是基于byte[] 数组来实现的。跟String类似,ByteString也被设计为不可变的,这样可以保证ByteString是线程安全的。
public class ByteString implements Serializable, Comparable<ByteString> {
final byte[] data;
ByteString(byte[] data) {
this.data = data; // Trusted internal constructor doesn't clone data.
}
......
}
同时ByteString提供了很多方便的工具方法,比如base64,sha1加密等。
public String base64() {
return Base64.encode(data);
}
/** Returns the 128-bit MD5 hash of this byte string. */
public ByteString md5() {
return digest("MD5");
}
/** Returns the 160-bit SHA-1 hash of this byte string. */
public ByteString sha1() {
return digest("SHA-1");
}
/** Returns the 256-bit SHA-256 hash of this byte string. */
public ByteString sha256() {
return digest("SHA-256");
}
/** Returns the 512-bit SHA-512 hash of this byte string. */
public ByteString sha512() {
return digest("SHA-512");
}
同时ByteString也提供了静态方法,方便与String类型互转。
/** Returns a new byte string containing the {@code UTF-8} bytes of {@code s}. */
public static ByteString encodeUtf8(String s) {
if (s == null) throw new IllegalArgumentException("s == null");
ByteString byteString = new ByteString(s.getBytes(Util.UTF_8));
byteString.utf8 = s;
return byteString;
}
/** Returns a new byte string containing the {@code charset}-encoded bytes of {@code s}. */
public static ByteString encodeString(String s, Charset charset) {
if (s == null) throw new IllegalArgumentException("s == null");
if (charset == null) throw new IllegalArgumentException("charset == null");
return new ByteString(s.getBytes(charset));
}
/** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */
public String utf8() {
String result = utf8;
// We don't care if we double-allocate in racy code.
return result != null ? result : (utf8 = new String(data, Util.UTF_8));
}
/** Constructs a new {@code String} by decoding the bytes using {@code charset}. */
public String string(Charset charset) {
if (charset == null) throw new IllegalArgumentException("charset == null");
return new String(data, charset);
}
最后
Okio并不是设计来代替Jdk io的,但是在某些重度io的场景,如果对性能优化追求极致的话,Okio不失是一种选择,关于Okio还有很多细节的知识由于篇幅关系没有涉及,有兴趣的同学可以去看源码中找答案,全文完。