

RetrofitOkHttpOkio 是 Square 的开源的安卓平台网络层三板斧,它们逐层分工,非常优雅地解决我们对网络请求甚至更广泛的 I/O 操作的需求。其中最底层的 Okio 堪称小而美,功能也更基础,应用更广泛。它的主要功能封装在ByteString和Buffer这两个类中。



   public static void main(String[] args) {
        File file = new File("test.txt");
        try {
            readString(new FileInputStream(file));
        } catch (IOException e) {

   public static void readString(InputStream in) throws IOException {
      BufferedSource source = Okio.buffer(Okio.source(in));  //创建BufferedSource
      String s = source.readUtf8();  //以UTF-8读
      System.out.println(s);     //打印


第一步,首先是调用okio的source(InputStream in)方法获取Source对象
第二步,调用okio的buffer(Source source)方法获取BufferedSource对象


2.API 非常简洁,易于实现:Source 和 Sink 的 API 非常简洁,为了应对更复杂的需求,Okio 还提供了 BufferedSource和 BufferedSink 接口,便于使用(按照任意类型进行读写,BufferedSource 还能进行查找和判空等);
4.便于测试,Buffer 同时实现了 BufferedSource 和 BufferedSink 接口,便于测试;




public interface Sink extends Closeable, Flushable {
  /** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
  void write(Buffer source, long byteCount) throws IOException;

  /** Pushes all buffered bytes to their final destination. */
  @Override void flush() throws IOException;

  /** Returns the timeout for this sink. */
  Timeout timeout();

   * Pushes all buffered bytes to their final destination and releases the
   * resources held by this sink. It is an error to write a closed sink. It is
   * safe to close a sink more than once.
  @Override void close() throws IOException;


public interface Source extends Closeable {
   * Removes at least 1, and up to {@code byteCount} bytes from this and appends
   * them to {@code sink}. Returns the number of bytes read, or -1 if this
   * source is exhausted.
  long read(Buffer sink, long byteCount) throws IOException;

  /** Returns the timeout for this source. */
  Timeout timeout();

   * Closes this source and releases the resources held by this source. It is an
   * error to read a closed source. It is safe to close a source more than once.
  @Override void close() throws IOException;





final class RealBufferedSink implements BufferedSink {
  public final Buffer buffer = new Buffer();//真正的实现是通过buffer实现
  public final Sink sink;//被装饰的sink
  boolean closed;

  RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;

 @Override public void write(Buffer source, long byteCount)
      throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, byteCount);

  @Override public BufferedSink write(ByteString byteString) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    return emitCompleteSegments();





/** The size of all segments in bytes. */
  static final int SIZE = 8192;//字节大小

  /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
  static final int SHARE_MINIMUM = 1024;
  final byte[] data;

  /** The next byte of application data byte to read in this segment. */
  int pos;

  /** The first byte of available data ready to be written to. */
//第一个可写的位置,一个Segment的可读数据量为limit - pos
  int limit;

  /** True if other segments or byte strings use the same byte array. */
  boolean shared;

  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
  boolean owner;

  /** Next segment in a linked or circularly-linked list. */
  Segment next;

  /** Previous segment in a circularly-linked list. */
  Segment prev;


Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.shared = shared;
    this.owner = owner;


   * Removes this segment of a circularly-linked list and returns its successor.
   * Returns null if the list is now empty.
  public @Nullable Segment pop() {
    Segment result = next != this ? next : null;//取后继节点next
    prev.next = next;//连接当前的prev和next
    next.prev = prev;
    next = null;
    prev = null;
    return result;//返回下个segment


   * Appends {@code segment} after this segment in the circularly-linked list.
   * Returns the pushed segment.
  public Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;//返回新加入的segment
  1. writeTo方法
/** Moves {@code byteCount} bytes from this segment to {@code sink}. */
  public void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();//pos字段可能因为在片段被读取出数据后,pos会后移,pos之前的空间也可用,所以这里先判断移动后仍然空间是否足够,如果不够抛异常
      //sink.pos-> sink.limit之间的数据移动到:0->sink.limit - sink.pos位置
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    System.arraycopy(data, pos, sink.data, sink.limit, byteCount); 
    sink.limit += byteCount;
    pos += byteCount;


   * Call this when the tail and its predecessor may both be less than half
   * full. This will copy data so that segments can be recycled.
  public void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    //计算前驱节点的可用空间,如果正在共享,则为SIZE - prev.limit,否则还要加上prev.pos前面的可用空间
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);


   * Splits this head of a circularly-linked list into two segments. The first
   * segment contains the data in {@code [pos..pos+byteCount)}. The second
   * segment contains the data in {@code [pos+byteCount..limit)}. This can be
   * useful when moving partial segments from one buffer to another.
   * <p>Returns the new head of the circularly-linked list.
  public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = sharedCopy();//copy片段,share标记为true
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    return prefix;//返回新建片段



 * A collection of unused segments, necessary to avoid GC churn and zero-fill.
 * This pool is a thread-safe static singleton.
final class SegmentPool {
  /** The maximum number of bytes to pool. */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  /** Singly-linked list of segments. */
  static @Nullable Segment next;

  /** Total bytes in this pool. */
  static long byteCount;

  private SegmentPool() {

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.

  static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount += Segment.SIZE;
      segment.next = next;
      segment.pos = segment.limit = 0;
      next = segment;



ByteString为不可变比特序列,官方文档说可以把它看成string的远方亲戚,且这个亲戚符合人工工学设计,逼格是不是很高。不过简单的讲,他就是一个byte序列(数组),以制定的编码格式进行解码。目前支持的解码规则有hex,base64和UTF-8等,机智如你可能会说String也是如此。是的,你说的没错,ByteString 只是把这些方法进行了封装。免去了我们直接输入类似的"utf-8"这样的错误,直接通过调用utf-8格式进行解码,还做了优化,在第一次调用uft8()方法的时候得到了一个该解码的String,同时在ByteString内部还保留了这个引用,当再次调用utf-8()的时候,则直接返回这个引用。

 static final char[] HEX_DIGITS =
      { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  private static final long serialVersionUID = 1L;

  /** A singleton empty {@code ByteString}. */
  public static final ByteString EMPTY = ByteString.of();

  final byte[] data;//字节数组
  transient int hashCode; // Lazily computed; 0 if unknown.
  transient String utf8; // Lazily computed.

  ByteString(byte[] data) {
    this.data = data; // Trusted internal constructor doesn't clone data.


   * Returns a new byte string containing a clone of the bytes of {@code data}.
   * 重新创建一个byte数组。clone一个数组的原因很简单,我们确保ByteString的data指向byte[]没有被其他对象所引用,否则就容易破坏ByteString中存储的是一个不可变化的的比特流数据这一约束。
  public static ByteString of(byte... data) {
    if (data == null) throw new IllegalArgumentException("data == null");
    return new ByteString(data.clone());

   * Returns a new byte string containing a copy of {@code byteCount} bytes of {@code data} starting
   * at {@code offset}.
  public static ByteString of(byte[] data, int offset, int byteCount) {
    if (data == null) throw new IllegalArgumentException("data == null");
    checkOffsetAndCount(data.length, offset, byteCount);
    byte[] copy = new byte[byteCount];
    System.arraycopy(data, offset, copy, 0, byteCount);
    return new ByteString(copy);

  public static ByteString of(ByteBuffer data) {
    if (data == null) throw new IllegalArgumentException("data == null");

    byte[] copy = new byte[data.remaining()];
    return new ByteString(copy);


/** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */
  public String utf8() {
    String result = utf8;
    // We don't care if we double-allocate in racy code.
    return result != null ? result : (utf8 = new String(data, Util.UTF_8));



Okio的核心类,前面讲到的RealBufferedSink、RealBufferedSource的读写功能都是通过Buffer实现的,它实现了BufferedSource, BufferedSink接口,它和前面提到的Sink\Source的实现关系如下:



public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel {
  private static final byte[] DIGITS =
      { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  static final int REPLACEMENT_CHARACTER = '\ufffd';

  @Nullable Segment head;//Segment循环链表头
  long size;//当前的数据量

  public Buffer() {



 @Override public byte readByte() {
    if (size == 0) throw new IllegalStateException("size == 0");
    Segment segment = head;
    int pos = segment.pos;
    int limit = segment.limit;

    byte[] data = segment.data;
    byte b = data[pos++];
    size -= 1;
    if (pos == limit) {
      head = segment.pop();
    } else {
      segment.pos = pos;
    return b;

通过源码我们发现,读取过程中,片段的可读位置一直后移,读完数据后,判断pos == limit:如果为true,表明该片段数据已经读完,需要弹出此片段并回收它,然后head指向循环链表的下一个片段;如为false则更新当前片段的可读位置。简单来说,读操作都是从head片段开始读的,读完一个片段就回收它,未读完就将可读位置后移1位。如下图为Buffer和SegementPool的结构:



@Override public int readInt() {
    if (size < 4) throw new IllegalStateException("size < 4: " + size);
    Segment segment = head;
    int pos = segment.pos;
    int limit = segment.limit;

    // If the int is split across multiple segments, delegate to readByte().
    if (limit - pos < 4) {
      return (readByte() & 0xff) << 24
          |  (readByte() & 0xff) << 16
          |  (readByte() & 0xff) <<  8
          |  (readByte() & 0xff);
    byte[] data = segment.data;
    int i = (data[pos++] & 0xff) << 24
        |   (data[pos++] & 0xff) << 16
        |   (data[pos++] & 0xff) <<  8
        |   (data[pos++] & 0xff);
    size -= 4;//读完size-4
    if (pos == limit) {
      head = segment.pop();
    } else {
      segment.pos = pos;
    return i;



 @Override public Buffer writeByte(int b) {
    Segment tail = writableSegment(1);
    tail.data[tail.limit++] = (byte) b;
    size += 1;//size++
    return this;


   * Returns a tail segment that we can write at least {@code minimumCapacity}
   * bytes to, creating it if necessary.
   * minimumCapacity为最小写入容量
  Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    Segment tail = head.prev;
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    return tail;


@Override public Buffer writeInt(int i) {
    Segment tail = writableSegment(4);//tail片段是可以写入的,容量够用
    byte[] data = tail.data;//取tail的数据
    int limit = tail.limit;//取写入位置
    data[limit++] = (byte) ((i >>> 24) & 0xff);
    data[limit++] = (byte) ((i >>> 16) & 0xff);
    data[limit++] = (byte) ((i >>>  8) & 0xff);
    data[limit++] = (byte)  (i         & 0xff);
    tail.limit = limit;//更新接入位置
    size += 4;//size+4
    return this;



