谈谈代码:Java IO业务代码优化之路

本文首发于泊浮目的专栏:https://segmentfault.com/blog/camile

版本 日期 备注
1.0 2019.4.27 文章首发
1.1 2021.5.21 修改标题:从一段代码谈起——浅谈JavaIO接口-> 谈谈代码:Java IO业务代码优化之路
1.1 2022.2.14 部分章节调整

1.前言

前阵子休息天日常在寻找项目里不好的代码,看到了这样的一段代码:

    private Result sshSameExec(Session session, String cmd) {
        if (log.isDebugEnabled()) {
            log.debug("shell command: {}", cmd);
        }
        UserInfo ui = getUserInfo();
        session.setUserInfo(ui);
        int exitStatus = 0;
        StringBuilder builder = new StringBuilder();
        ChannelExec channel;
        InputStream in;
        InputStream err;
        try {
            session.connect(connectTimeout);
            channel = (ChannelExec) session.openChannel("exec");
            channel.setCommand(cmd);
            in = channel.getInputStream();
            err = channel.getErrStream();
            channel.connect();
        } catch (Exception e) {
            throw new CloudRuntimeException(e);
        }

        try {
            long lastRead = Long.MAX_VALUE;
            byte[] tmp = new byte[1024];
            while (true) {
                while (in.available() > 0 || err.available() > 0) {
                    int i = 0;
                    if (in.available() > 0) {
                        i = in.read(tmp, 0, 1024);
                    } else if (err.available() > 0) {
                        i = err.read(tmp, 0, 1024);
                    }
                    if (i < 0) {
                        break;
                    }
                    lastRead = System.currentTimeMillis();
                    builder.append(new String(tmp, 0, i));
                }
                if (channel.isClosed()) {
                    if (in.available() > 0) {
                        continue;
                    }
                    exitStatus = channel.getExitStatus();
                    break;
                }
                if (System.currentTimeMillis() - lastRead > exeTimeout) {
                    break;
                }
            }
        } catch (IOException e) {
            throw new CloudRuntimeException(e);
        } finally {
            channel.disconnect();
            session.disconnect();
        }

        if (0 != exitStatus) {
            return Result.createByError(ErrorData.builder()
                    .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode())
                    .detail(builder.toString())
                    .title(ResultCode.EXECUTE_SSH_FAIL.toString())
                    .build());
        } else {
            return Result.createBySuccess(builder.toString());
        }
    }

简单解释一下这段代码——即通过ssh到一台机器上,然后执行一些命令.对命令输出的东西,开了一个循环,每一次读一定的位置,然后以字节流的形式读回来.

这段代码有点丑,于是我闻到了学习的味道.


首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.

在改良之前,我们先来回顾一下JavaIO的接口定义.

2.JavaIO 接口知识回顾

2.1 低级抽象接口:InputStream 和 OutputStream

这里有同学可能问了,为啥叫它低抽象接口呢?因为它离底层太近了,计算机本来就是处理二进制的,而这两个接口正是用来处理二进制数据流的.

先简单看一眼这两个接口:

  • InputStream
**
 * This abstract class is the superclass of all classes representing
 * an input stream of bytes.
 *
 * <p> Applications that need to define a subclass of <code>InputStream</code>
 * must always provide a method that returns the next byte of input.
 *
 * @author  Arthur van Hoff
 * @see     java.io.BufferedInputStream
 * @see     java.io.ByteArrayInputStream
 * @see     java.io.DataInputStream
 * @see     java.io.FilterInputStream
 * @see     java.io.InputStream#read()
 * @see     java.io.OutputStream
 * @see     java.io.PushbackInputStream
 * @since   JDK1.0
 */
public abstract class InputStream implements Closeable {.....}
  • OutputStream
/**
 * This abstract class is the superclass of all classes representing
 * an output stream of bytes. An output stream accepts output bytes
 * and sends them to some sink.
 * <p>
 * Applications that need to define a subclass of
 * <code>OutputStream</code> must always provide at least a method
 * that writes one byte of output.
 *
 * @author  Arthur van Hoff
 * @see     java.io.BufferedOutputStream
 * @see     java.io.ByteArrayOutputStream
 * @see     java.io.DataOutputStream
 * @see     java.io.FilterOutputStream
 * @see     java.io.InputStream
 * @see     java.io.OutputStream#write(int)
 * @since   JDK1.0
 */
public abstract class OutputStream implements Closeable, Flushable {...}

我们可以发现,它们都实现了Closeable的接口.因此大家在使用这些原生类时,要注意在结束时调用Close方法哦.

这两个接口的常用实现类有:

  • FileInputStreamFileOutputStream
  • DataInputStreamDataOutputStream
  • ObjectInputStreamObjectOutputStream

2.2 高级抽象接口——Writer和Reader

为啥说它是高级抽象接口呢?我们先来看看它们的注释:

  • Writer
/**
 * Abstract class for writing to character streams.  The only methods that a
 * subclass must implement are write(char[], int, int), flush(), and close().
 * Most subclasses, however, will override some of the methods defined here in
 * order to provide higher efficiency, additional functionality, or both.
 *
 * @see Writer
 * @see   BufferedWriter
 * @see   CharArrayWriter
 * @see   FilterWriter
 * @see   OutputStreamWriter
 * @see     FileWriter
 * @see   PipedWriter
 * @see   PrintWriter
 * @see   StringWriter
 * @see Reader
 *
 * @author      Mark Reinhold
 * @since       JDK1.1
 */

public abstract class Writer implements Appendable, Closeable, Flushable {
  • Reader
/**
 * Abstract class for reading character streams.  The only methods that a
 * subclass must implement are read(char[], int, int) and close().  Most
 * subclasses, however, will override some of the methods defined here in order
 * to provide higher efficiency, additional functionality, or both.
 *
 *
 * @see BufferedReader
 * @see   LineNumberReader
 * @see CharArrayReader
 * @see InputStreamReader
 * @see   FileReader
 * @see FilterReader
 * @see   PushbackReader
 * @see PipedReader
 * @see StringReader
 * @see Writer
 *
 * @author      Mark Reinhold
 * @since       JDK1.1
 */

public abstract class Reader implements Readable, Closeable {

我们可以看到,这个抽象类是用来面向character的,也就是字符.字符的抽象等级必然比字节高,因为字符靠近上层,即人类.

2.3 优化输入和输出——Buffered

如果我们直接使用上述实现类去打开一个文件(如FileWriterFileReaderFileInputStreamFileOutputStream),对其对象调用readwritereadLine等,每个请求都是由基础OS直接处理的,这会使一个程序效率低得多——因为它们都会引发磁盘访问or网络请求等.

为了减少这种开销,Java 平台实现缓冲 I/O 流。缓冲输入流从被称为缓冲区(buffer)的存储器区域读出数据;仅当缓冲区是空时,本地输入 API 才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出 API。

用于包装非缓存流的缓冲流类有4个:BufferedInputStreamBufferedOutputStream·用于创建字节缓冲字节流,BufferedReaderBufferedWriter`用于创建字符缓冲字节流.

3. 着手优化

之前,我们提到了这段代码写得搓的地方:

  • 首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.
  • 其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.

故此,我们可以考虑对每个Stream都进行包装,支持用线程去消费,其次我们可以用高级抽象分接口去适配Byte,然后去装饰成Buffer.

接下来,我们来看一段ZStack里的工具类ShellUtils,为了节省篇幅,我们仅仅截出它在IDE里的Structure:

run方法的核心:


我们可以看到StreamConsumer这个类,我们来看一下它的代码:

    private static class StreamConsumer extends Thread {
        final InputStream in;
        final PrintWriter out;
        final boolean flush;

        StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) {
            this.in = in;
            this.out = out;
            flush = flushEveryWrite;
        }

        @Override
        public void run() {
            BufferedReader br = null;
            try {
                br = new BufferedReader(new InputStreamReader(in));
                String line;
                while ( (line = br.readLine()) != null) {
                    out.println(line);
                    if (flush) {
                        out.flush();
                    }
                }
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
            } finally {
                try {
                    if (br != null) {
                        br.close();
                    }
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

这段代码已经达到了我们的理想状态:线程消费,高级抽象.

3.1 使用Kotlin

3.1.1 Kotlin IO

闲话不多说,先贴代码为敬:

import java.io.InputStream
import java.io.InputStreamReader

class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable {

    override fun run() {
        InputStreamReader(inputStream).buffered().use {
            it.lines().forEach { r -> result.append(r) }
        }
    }
}

还是一样熟悉的配方,我们逐行来解读:

  1. 定义一个类,并且要求构造函数必须传入InputStream和一个StringBuilder.且实现了Runnable接口,这意味着它可以被线程消费.
  2. 覆写run方法.我们可以看到InputStream被适配成了InputStreamReader,这意味着它可以输出字符流了,然后我们使用了Kotlin的接口将其装饰成了Buffer.
  3. 读每一行buffer,并appned到result这个StringBuilder里去.
  4. 读完就可以告辞了,close.(use会将其关闭)

3.1.2 Kotlin Coroutine

先看一下上面的图,我们都知道内核态线程是由OS调度的,但当一个线程拿到时间片时,却调到了阻塞IO,那么只能等在那边,浪费时间.

而协程则可以解决这个问题,当一个Jobhang住的时候,可以去做别的事情,绕开阻塞.更好的利用时间片.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352