Okio 框架源码学习

Retrofit,OkHttp,Okio Square 安卓平台网络层三板斧源码学习
基于 okio 1.13.0 版本 okio github 地址

简介

Okio 主要是替代 java.io 和 java.nio 那一套复杂的 api 调用框架,让数据访问、存储和处理更加便捷。

Okio 是 OkHttp 的基石,而 OkHttp 是 Retrofit 的基石。

使用方式

参考 OkioTest

……
@Test public void readWriteFile() throws Exception {
    File file = temporaryFolder.newFile();

    BufferedSink sink = Okio.buffer(Okio.sink(file));
    sink.writeUtf8("Hello, java.io file!");
    sink.close();
    assertTrue(file.exists());
    assertEquals(20, file.length());

    BufferedSource source = Okio.buffer(Okio.source(file));
    assertEquals("Hello, java.io file!", source.readUtf8());
    source.close();
}

……

通过 OkioTest 可以大致明白 Okio 主要有 『读』、『写』两大类操作。可以操作的对象为:

1. 文件
2. 文件路径描述类 Path
3. Socket
4. OutputStream
5. InputStream

Okio 通过 sink(xxx) 去写一个对象,通过 source(xxx)去读一个对象。

一览 Okio 读写框架

通过 Okio.buffer() 获得用来读写的 BufferedSource、BufferedSink

BufferedSink sink = Okio.buffer(Okio.sink(file));
BufferedSource source = Okio.buffer(Okio.source(file));

进一步查看 buffer() 方法

public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
}

public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
}
RealBufferedSink

看下 RealBufferedSink 类

final class RealBufferedSink implements BufferedSink {
    public final Buffer buffer = new Buffer();
    public final Sink sink;
    boolean closed;

    RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;
    }
    ……
}

RealBufferedSink 实现了 BufferedSink 接口,BufferedSink 实现了 Sink 接口。

而 Sink 实现了 Closeable, Flushable 接口。

1. Flushable 接口只定义了一个方法 flush() ,用来实现把数据从缓存区写入到底层输入流。
2. Closeable 接口定义 close() ,用来关闭流。
3. Sink 接口又定义了一个 write(Buffer source, long byteCount) 和 timeout() 用来写入数据和设置超时。
4. BufferedSink 接口则定义了一堆 wirteXXX(……) 用来操作不同类型的数据写入。

在看 RealBufferedSink 的成员变量

public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;

这里出现了一个 Buffer 对象,一个从构造函数里面传入的 Sink 对象,以及一个用来记录流是否关闭的 boolean 标志。

RealBufferedSink 的各种 wirteXXX(……)大都如下

@Override public BufferedSink writeXXX(……) throws IOException {
  if (closed) throw new IllegalStateException("closed");
  buffer.writeXXX(……);
  return emitCompleteSegments();
}

可见写入数据的方法,都是有 buffer 对象实现。而在 emitXXX() 和 flush() 方法中则调用了 sink.write(buffer, byteCount) 和 sink.flush()

RealBufferedSource

RealBufferedSource 和 RealBufferedSink 类似

final class RealBufferedSource implements BufferedSource {
    public final Buffer buffer = new Buffer();
    public final Source source;
    boolean closed;

    RealBufferedSource(Source source) {
        if (source == null) throw new NullPointerException("source == null");
        this.source = source;
    }
}

RealBufferedSource 实现了 BufferedSource 接口,BufferedSource 实现了 Source 接口。

Source 接口同样也实现了 Closeable 接口。

1. Source 集成了 Closeable 接口,表示 Source 提供了一个 close 方法关闭读取数据的流。
2. Source 定义了一个 read(Buffer sink, long byteCount) 用来读取数据,一个 timeout() 方法用来设置读取超时。
3. BufferedSource 定义了很多 readXXX(……) 用来读取数据。

RealBufferedSource 中的 readXXX(……) 方法和 RealBufferedSink 中的 writeXXX(……) 类似,都是通过成员变量 buffer 和 构造对象时传入的 Source 对象配合起来读取数据。

总结一下整个读写框架的结构如下:

okio_01.png

对所有读写对象的统一处理

无论是 File 、Path、InputStream、OutputStream 、Socket ,在 Okio 框架中只要一个简单的 Okio.sink(……) 方法即可获得对应的输入流(RealBufferedSink)和输出流(RealBufferedSource)

而且 Okio 还给输入/输出流的都提供一个额外参数:Timeout,用来设置读写的超时设置。

所有的 sink 方法,都会调用到

private static Sink sink(final OutputStream out, final Timeout timeout) {
  if (out == null) throw new IllegalArgumentException("out == null");
  if (timeout == null) throw new IllegalArgumentException("timeout == null");

  return new Sink() {
    @Override public void write(Buffer source, long byteCount) throws IOException {
      checkOffsetAndCount(source.size, 0, byteCount);
      while (byteCount > 0) {
        timeout.throwIfReached();
        Segment head = source.head;
        int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
        out.write(head.data, head.pos, toCopy);

        head.pos += toCopy;
        byteCount -= toCopy;
        source.size -= toCopy;

        if (head.pos == head.limit) {
          source.head = head.pop();
          SegmentPool.recycle(head);
        }
      }
    }

    @Override public void flush() throws IOException {
      out.flush();
    }

    @Override public void close() throws IOException {
      out.close();
    }

    @Override public Timeout timeout() {
      return timeout;
    }

    @Override public String toString() {
      return "sink(" + out + ")";
    }
  };
}

Okio.sink() 会创建一个匿名内部类的实例,实现了 write 方法,用来写入数据到 OutputStream(File 、Path、Socket 都会被转成成 OutputStream(),每次写入数据的时候,都会检测是否超时。(超时机制后面后讲)

Okio.Source() 类似会创建一个实现 Source 接口的匿名内部类实例。实现 read 方法 ,负责从 InputStream 中读取数据。

Okio 在读写数据的时候,里面都会用用一个 Segment 对象。Segment 是 Okio 定义的一个链表结构的数据片段,每个 Segment 可以最多存放 8K 的字节。

万能的 Buffer

写数据的时候 Okio 会先把数据写到 buffer 中

BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
sink.close();

Okio.buffer() 返回的是 RealBufferedSink

@Override public BufferedSink writeUtf8(String string) throws IOException {
  if (closed) throw new IllegalStateException("closed");
  buffer.writeUtf8(string);
  return emitCompleteSegments();
}

查看 writeUtf8

@Override public Buffer writeUtf8(String string) {
  return writeUtf8(string, 0, string.length());
}

然后把 String 变成一个 Segment 链表

@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
    ……

    // Transcode a UTF-16 Java String to UTF-8 bytes.
    for (int i = beginIndex; i < endIndex;) {
      int c = string.charAt(i);

      if (c < 0x80) {
        Segment tail = writableSegment(1);
        byte[] data = tail.data;
        ……
        while (i < runLimit) {
          c = string.charAt(i);
          if (c >= 0x80) break;
          data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
        }    
        ……

      } else if (c < 0x800) {
        // Emit a 11-bit character with 2 bytes.
        writeByte(c >>  6        | 0xc0); // 110xxxxx
        writeByte(c       & 0x3f | 0x80); // 10xxxxxx
        i++;

      } ……
    }

    return this;
  }

通过 writableSegment 是不是要开辟新的 Segment 到队列尾部

Segment writableSegment(int minimumCapacity) {
  if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

  if (head == null) {
    head = SegmentPool.take(); // Acquire a first segment.
    return head.next = head.prev = head;
  }

  Segment tail = head.prev;
  if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
    tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
  }
  return tail;
}

在看 emitCompleteSegments()

@Override
public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}

buffer.completeSegmentByteCount() 用来计算 Segment 的缓存的字节长度

public long completeSegmentByteCount() {
    long result = size;
    if (result == 0) return 0;

    // Omit the tail if it's still writable.
    Segment tail = head.prev;
    if (tail.limit < Segment.SIZE && tail.owner) {
        result -= tail.limit - tail.pos;
    }

    return result;
}

sink.write(buffer, byteCount) 就是之前传入的集成的 Sink 匿名类。

总结一下整个流程

okio_02.png

读数据的时候 Buffer 起到的作用类似,直接贴一下流程图

okio_03.png

Okio 超时机制

Okio 可以给他 OutputStream 、InputStream 增加一个超市设置。读写文件时会设置一个默认的 TimeOut ,这个方法是个空实现。

在读写 Socket 的时候,Okio 给我们展示一个如何设置一个异步的超时机制,用来在 Socket 读写超时时关闭流。

public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
}

先看 timeout(socket)

private static AsyncTimeout timeout(final Socket socket) {
    return new AsyncTimeout() {
        @Override
        protected IOException newTimeoutException(@Nullable IOException cause) {
            ……
        }

        @Override
        protected void timedOut() {
            try {
                socket.close();
            }……
        }
    };
}

这里看出会返回一个 AsyncTimeout 的匿名对象,主要在 timeOut() 中关闭 Socket。

sink(socket.getOutputStream(), timeout) 方法在上面已经看过了主要看其中的一句代码

private static Sink sink(final OutputStream out, final Timeout timeout) {
    ……
    return new Sink() {
        @Override
        public void write(Buffer source, long byteCount) throws IOException {
            ……
            while (byteCount > 0) {
                timeout.throwIfReached();
                ……
            }
        }
        ……
    };
}

在看一下 throwIfReached 方法

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
        throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
        throw new InterruptedIOException("deadline reached");
    }
}

如果 hasDeadline 是 true,并且 deadlineNanoTime 大于 System.nanoTime() 来判断是否达超时。

在看 timeout.sink(sink)

public final Sink sink(final Sink sink) {
    return new Sink() {
        @Override
        public void write(Buffer source, long byteCount) throws IOException {
            checkOffsetAndCount(source.size, 0, byteCount);

            while (byteCount > 0L) {
                // Count how many bytes to write. This loop guarantees we split on a segment boundary.
                long toWrite = 0L;
                for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
                    int segmentSize = s.limit - s.pos;
                    toWrite += segmentSize;
                    if (toWrite >= byteCount) {
                        toWrite = byteCount;
                        break;
                    }
                }

                // Emit one write. Only this section is subject to the timeout.
                boolean throwOnTimeout = false;
                enter();
                try {
                    sink.write(source, toWrite);
                    byteCount -= toWrite;
                    throwOnTimeout = true;
                } catch (IOException e) {
                    throw exit(e);
                } finally {
                    exit(throwOnTimeout);
                }
            }
        }

        @Override
        public void flush() throws IOException {
            boolean throwOnTimeout = false;
            enter();
            try {
                sink.flush();
                throwOnTimeout = true;
            } catch (IOException e) {
                throw exit(e);
            } finally {
                exit(throwOnTimeout);
            }
        }

        @Override
        public void close() throws IOException {
            boolean throwOnTimeout = false;
            enter();
            try {
                sink.close();
                throwOnTimeout = true;
            } catch (IOException e) {
                throw exit(e);
            } finally {
                exit(throwOnTimeout);
            }
        }

        @Override
        public Timeout timeout() {
            return AsyncTimeout.this;
        }

        @Override
        public String toString() {
            return "AsyncTimeout.sink(" + sink + ")";
        }
    };
}

可以看出 timeout.sink(sink) 重新包装了 Sink 给 Sink 的每个方法都增加一个 enter() 方法

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
        return; // No timeout and no deadline? Don't bother with the queue.
    }
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
}

这里会发现如果满足了条件,会执行 scheduleTimeout 方法。但是默认情况下,条件不会被满足。

查看一下 SocketTimeoutTest

@Test
public void readWithoutTimeout() throws Exception {
    Socket socket = socket(ONE_MB, 0);
    BufferedSource source = Okio.buffer(Okio.source(socket));
    source.timeout().timeout(5000, TimeUnit.MILLISECONDS);
    source.require(ONE_MB);
    socket.close();
}

这里可以看到,需要调用 source.timeout().timeout(5000, TimeUnit.MILLISECONDS)

public Timeout timeout(long timeout, TimeUnit unit) {
    if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
    if (unit == null) throw new IllegalArgumentException("unit == null");
    this.timeoutNanos = unit.toNanos(timeout);
    return this;
}

这里可以看到 timeoutNanos 在这里赋值了。所以设置 timeout(5000, TimeUnit.MILLISECONDS) 后会出发 scheduleTimeout(this, timeoutNanos, hasDeadline)

private static synchronized void scheduleTimeout(
        AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // Start the watchdog thread and create the head node when the first timeout is scheduled.
    if (head == null) {
        head = new AsyncTimeout();
        new Watchdog().start();
    }
    ……
    // Insert the node in sorted order.
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
        if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
            node.next = prev.next;
            prev.next = node;
            if (prev == head) {
                AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
            }
            break;
        }
    }
}

这里用到一个同步锁、启动一个 Watchdog 线程。并且根据 timeout 的超时时间,把 AsyncTimeout 添加到一个任务队列中。

private static final class Watchdog extends Thread {
    Watchdog() {
        super("Okio Watchdog");
        setDaemon(true);
    }

    public void run() {
        while (true) {
            try {
                AsyncTimeout timedOut;
                synchronized (AsyncTimeout.class) {
                    timedOut = awaitTimeout();

                    // Didn't find a node to interrupt. Try again.
                    if (timedOut == null) continue;

                    // The queue is completely empty. Let this thread exit and let another watchdog thread
                    // get created on the next call to scheduleTimeout().
                    if (timedOut == head) {
                        head = null;
                        return;
                    }
                }

                // Close the timed out node.
                timedOut.timedOut();
            } catch (InterruptedException ignored) {
            }
        }
    }
}

Watchdog 线程会一直同步遍历任务队列执行 awaitTimeout()

static @Nullable
AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    AsyncTimeout node = head.next;

    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
    if (node == null) {
        long startNanos = System.nanoTime();
        AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
        return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
                ? head  // The idle timeout elapsed.
                : null; // The situation has changed.
    }

    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    if (waitNanos > 0) {
        // Waiting is made complicated by the fact that we work in nanoseconds,
        // but the API wants (millis, nanos) in two arguments.
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
        return null;
    }

    // The head of the queue has timed out. Remove it.
    head.next = node.next;
    node.next = null;
    return node;
}

}
这里会根据队列头部的 AsyncTimeout 节点,计算出剩余时间,然后执行 AsyncTimeout.class.wait(waitMillis, (int) waitNanos) 方法阻塞。

注意这个的 wait(timeout) 会被 AsyncTimeout.class.notify() 唤醒。

如果任务队列为空会执行 AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS) ,等待一分钟。然后再判断是否有新的任务。

参考资料

拆轮子系列:拆 Okio

Okio源码分析

okio github 地址

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容