java.io源码解析(五)--管道字节流(PipedInputStream,PipedOutputStream)管道字符流(PipedReader,PipedWriter)

声明:本系列只供本人自学使用,勿喷。

参考:https://www.cnblogs.com/skywang12345/p/io_04.html

管道流一般用于线程间的通讯。

大致流程:在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据,就可以实现,线程A和线程B的通信。

一、PipedInputStream

public class PipedInputStream extends InputStream{
    // 循环缓存数组,默认1024
    protected byte buffer[];
    // PipedOutputStream往PipedInputStream的buffer中写入数据的下一个位置
    protected int in = -1;
    // PipedInputStream从buffer中读数据的下一个位置
    protected int out = 0;
}
  • 构造器
    // 使得 PipedOutputStream与该PipedInputStream建立连接
    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
         initPipe(pipeSize);
         connect(src);
    }

    // 构造未建立连接的PipedInputStream
    public PipedInputStream()
    public PipedInputStream(int pipeSize)

-核心方法

    // 建立连接
    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this);
    }

    // 是PipedOutputStream的write(int b)方法中调用的,使得PipedInputStream能receive
    protected synchronized void receive(int b) throws IOException{
        1.检查连接状态,checkStateForReceive();
        2.假如写入的所有数据已全部读完(即in==out),awaitSpace();
          将新写入的数据添加到buffer中
    }
    
    // 若 “写入管道” 已将 “读取管道” 的缓存buffer写满,则需要执行awaitSpace()操作
    // 唤醒“读取管道”的线程进行读取(读完即可清空buffer继续写入)
    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    // 接收byte[]数组
    // 在PipedOutputStream的write(byte b[], int off, int len)方法中调用,使得PipedInputStream能receive
    synchronized void receive(byte b[], int off, int len)  throws IOException{
        checkStateForReceive();
        writeSide = Thread.currentThread();//获取写入线程
        int bytesToTransfer = len;//PipedOutputStream写入的字节总数
        while (bytesToTransfer > 0) {
            // 第一次执行时,in=-1,out=0,证明buffer为空
            if (in == out)
                awaitSpace();

            // 计算可拷贝到buffer的字节总数 nextTransferAmount 
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);

            // 拷贝到buffer
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            // 已经写入1024个字节后(即buffer已满),需要将循环数组in归0
            if (in >= buffer.length) {
                in = 0;
            }
        }
}

    // 返回下一个字节
    public synchronized int read()  throws IOException
    // 将buffer的数据读取到 byte b[],当都读完后,清空buffer(即 int=-1,out=0)
    public synchronized int read(byte b[], int off, int len)  throws IOException 

二、PipedOutputStream

public class PipedOutputStream extends OutputStream{
    private PipedInputStream sink;
}
  • 构造器
    // 与PipedInputStream建立连接
    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);
    }

    // 构造未建立连接的PipedOutputStream
    public PipedOutputStream() {
    }

-核心方法

    // 建立连接
    public synchronized void connect(PipedInputStream snk) throws IOException

    public void write(int b)  throws IOException{
        sink.receive(b);
    }

    public void write(byte b[], int off, int len) throws IOException{
        sink.receive(b, off, len);
    }
    
    // 清空PipedOutputStream
    // 目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。
    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }
    
    // 关闭PipedOutputStream并释放资源
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }

三、demo

public class Test3 {
    public static void main(String[] args) throws IOException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver();
        receiver.getPipedInputStream().connect(sender.getPipedOutputStream());
        sender.start();
        receiver.start();
    }
}

class Sender extends Thread {
    private PipedOutputStream pipedOutputStream = new PipedOutputStream();

    public PipedOutputStream getPipedOutputStream() {
        return pipedOutputStream;
    }

    @Override
    public void run() {
        writeMessage();
    }

    private void writeMessage() {
        try {
            pipedOutputStream.write("哈喽,China".getBytes());
            pipedOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class Receiver extends Thread {
    private PipedInputStream pipedInputStream = new PipedInputStream();

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {
        readMessage();
    }

    void readMessage() {
        try {
            byte[] bytes = new byte[1024];
            int len = 0;
            while ((len = pipedInputStream.read(bytes)) != -1) {
                System.out.println(new String(bytes, 0, len));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355

推荐阅读更多精彩内容

  • 概述 管道流是用来在多个线程之间进行信息传递的Java流。管道流分为字节流管道流和字符管道流。字节管道流:Pipe...
    jijs阅读 7,951评论 0 3
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,820评论 1 19
  • 1.import static是Java 5增加的功能,就是将Import类中的静态方法,可以作为本类的静态方法来...
    XLsn0w阅读 1,225评论 0 2
  • 本部分总结一下JAVA IO的相关知识。 全部章节传送门: JAVA IO学习笔记: IO基本知识 JAVA IO...
    简单一点点阅读 1,909评论 0 0
  • 专业考题类型管理运行工作负责人一般作业考题内容选项A选项B选项C选项D选项E选项F正确答案 变电单选GYSZ本规程...
    小白兔去钓鱼阅读 8,993评论 0 13