java.io源码解析(五)--管道字节流(PipedInputStream,PipedOutputStream)管道字符流(PipedReader,PipedWriter)

声明:本系列只供本人自学使用,勿喷。

参考:https://www.cnblogs.com/skywang12345/p/io_04.html

管道流一般用于线程间的通讯。

大致流程:在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据,就可以实现,线程A和线程B的通信。

一、PipedInputStream

public class PipedInputStream extends InputStream{
    // 循环缓存数组,默认1024
    protected byte buffer[];
    // PipedOutputStream往PipedInputStream的buffer中写入数据的下一个位置
    protected int in = -1;
    // PipedInputStream从buffer中读数据的下一个位置
    protected int out = 0;
}
  • 构造器
    // 使得 PipedOutputStream与该PipedInputStream建立连接
    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
         initPipe(pipeSize);
         connect(src);
    }

    // 构造未建立连接的PipedInputStream
    public PipedInputStream()
    public PipedInputStream(int pipeSize)

-核心方法

    // 建立连接
    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this);
    }

    // 是PipedOutputStream的write(int b)方法中调用的,使得PipedInputStream能receive
    protected synchronized void receive(int b) throws IOException{
        1.检查连接状态,checkStateForReceive();
        2.假如写入的所有数据已全部读完(即in==out),awaitSpace();
          将新写入的数据添加到buffer中
    }
    
    // 若 “写入管道” 已将 “读取管道” 的缓存buffer写满,则需要执行awaitSpace()操作
    // 唤醒“读取管道”的线程进行读取(读完即可清空buffer继续写入)
    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    // 接收byte[]数组
    // 在PipedOutputStream的write(byte b[], int off, int len)方法中调用,使得PipedInputStream能receive
    synchronized void receive(byte b[], int off, int len)  throws IOException{
        checkStateForReceive();
        writeSide = Thread.currentThread();//获取写入线程
        int bytesToTransfer = len;//PipedOutputStream写入的字节总数
        while (bytesToTransfer > 0) {
            // 第一次执行时,in=-1,out=0,证明buffer为空
            if (in == out)
                awaitSpace();

            // 计算可拷贝到buffer的字节总数 nextTransferAmount 
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);

            // 拷贝到buffer
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            // 已经写入1024个字节后(即buffer已满),需要将循环数组in归0
            if (in >= buffer.length) {
                in = 0;
            }
        }
}

    // 返回下一个字节
    public synchronized int read()  throws IOException
    // 将buffer的数据读取到 byte b[],当都读完后,清空buffer(即 int=-1,out=0)
    public synchronized int read(byte b[], int off, int len)  throws IOException 

二、PipedOutputStream

public class PipedOutputStream extends OutputStream{
    private PipedInputStream sink;
}
  • 构造器
    // 与PipedInputStream建立连接
    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);
    }

    // 构造未建立连接的PipedOutputStream
    public PipedOutputStream() {
    }

-核心方法

    // 建立连接
    public synchronized void connect(PipedInputStream snk) throws IOException

    public void write(int b)  throws IOException{
        sink.receive(b);
    }

    public void write(byte b[], int off, int len) throws IOException{
        sink.receive(b, off, len);
    }
    
    // 清空PipedOutputStream
    // 目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。
    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }
    
    // 关闭PipedOutputStream并释放资源
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }

三、demo

public class Test3 {
    public static void main(String[] args) throws IOException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver();
        receiver.getPipedInputStream().connect(sender.getPipedOutputStream());
        sender.start();
        receiver.start();
    }
}

class Sender extends Thread {
    private PipedOutputStream pipedOutputStream = new PipedOutputStream();

    public PipedOutputStream getPipedOutputStream() {
        return pipedOutputStream;
    }

    @Override
    public void run() {
        writeMessage();
    }

    private void writeMessage() {
        try {
            pipedOutputStream.write("哈喽,China".getBytes());
            pipedOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class Receiver extends Thread {
    private PipedInputStream pipedInputStream = new PipedInputStream();

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {
        readMessage();
    }

    void readMessage() {
        try {
            byte[] bytes = new byte[1024];
            int len = 0;
            while ((len = pipedInputStream.read(bytes)) != -1) {
                System.out.println(new String(bytes, 0, len));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 概述 管道流是用来在多个线程之间进行信息传递的Java流。管道流分为字节流管道流和字符管道流。字节管道流:Pipe...
    jijs阅读 8,173评论 0 3
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,993评论 1 19
  • 1.import static是Java 5增加的功能,就是将Import类中的静态方法,可以作为本类的静态方法来...
    XLsn0w阅读 1,394评论 0 2
  • 本部分总结一下JAVA IO的相关知识。 全部章节传送门: JAVA IO学习笔记: IO基本知识 JAVA IO...
    简单一点点阅读 1,975评论 0 0
  • 专业考题类型管理运行工作负责人一般作业考题内容选项A选项B选项C选项D选项E选项F正确答案 变电单选GYSZ本规程...
    小白兔去钓鱼阅读 10,225评论 0 13

友情链接更多精彩内容