声明:本系列只供本人自学使用,勿喷。
参考:https://www.cnblogs.com/skywang12345/p/io_04.html
管道流一般用于线程间的通讯。
大致流程:在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据,就可以实现,线程A和线程B的通信。
一、PipedInputStream
public class PipedInputStream extends InputStream{
// 循环缓存数组,默认1024
protected byte buffer[];
// PipedOutputStream往PipedInputStream的buffer中写入数据的下一个位置
protected int in = -1;
// PipedInputStream从buffer中读数据的下一个位置
protected int out = 0;
}
- 构造器
// 使得 PipedOutputStream与该PipedInputStream建立连接
public PipedInputStream(PipedOutputStream src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}
public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}
// 构造未建立连接的PipedInputStream
public PipedInputStream()
public PipedInputStream(int pipeSize)
-核心方法
// 建立连接
public void connect(PipedOutputStream src) throws IOException {
src.connect(this);
}
// 是PipedOutputStream的write(int b)方法中调用的,使得PipedInputStream能receive
protected synchronized void receive(int b) throws IOException{
1.检查连接状态,checkStateForReceive();
2.假如写入的所有数据已全部读完(即in==out),awaitSpace();
将新写入的数据添加到buffer中
}
// 若 “写入管道” 已将 “读取管道” 的缓存buffer写满,则需要执行awaitSpace()操作
// 唤醒“读取管道”的线程进行读取(读完即可清空buffer继续写入)
private void awaitSpace() throws IOException {
while (in == out) {
checkStateForReceive();
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
// 接收byte[]数组
// 在PipedOutputStream的write(byte b[], int off, int len)方法中调用,使得PipedInputStream能receive
synchronized void receive(byte b[], int off, int len) throws IOException{
checkStateForReceive();
writeSide = Thread.currentThread();//获取写入线程
int bytesToTransfer = len;//PipedOutputStream写入的字节总数
while (bytesToTransfer > 0) {
// 第一次执行时,in=-1,out=0,证明buffer为空
if (in == out)
awaitSpace();
// 计算可拷贝到buffer的字节总数 nextTransferAmount
int nextTransferAmount = 0;
if (out < in) {
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
// 拷贝到buffer
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
// 已经写入1024个字节后(即buffer已满),需要将循环数组in归0
if (in >= buffer.length) {
in = 0;
}
}
}
// 返回下一个字节
public synchronized int read() throws IOException
// 将buffer的数据读取到 byte b[],当都读完后,清空buffer(即 int=-1,out=0)
public synchronized int read(byte b[], int off, int len) throws IOException
二、PipedOutputStream
public class PipedOutputStream extends OutputStream{
private PipedInputStream sink;
}
- 构造器
// 与PipedInputStream建立连接
public PipedOutputStream(PipedInputStream snk) throws IOException {
connect(snk);
}
// 构造未建立连接的PipedOutputStream
public PipedOutputStream() {
}
-核心方法
// 建立连接
public synchronized void connect(PipedInputStream snk) throws IOException
public void write(int b) throws IOException{
sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException{
sink.receive(b, off, len);
}
// 清空PipedOutputStream
// 目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
// 关闭PipedOutputStream并释放资源
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
三、demo
public class Test3 {
public static void main(String[] args) throws IOException {
Sender sender = new Sender();
Receiver receiver = new Receiver();
receiver.getPipedInputStream().connect(sender.getPipedOutputStream());
sender.start();
receiver.start();
}
}
class Sender extends Thread {
private PipedOutputStream pipedOutputStream = new PipedOutputStream();
public PipedOutputStream getPipedOutputStream() {
return pipedOutputStream;
}
@Override
public void run() {
writeMessage();
}
private void writeMessage() {
try {
pipedOutputStream.write("哈喽,China".getBytes());
pipedOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class Receiver extends Thread {
private PipedInputStream pipedInputStream = new PipedInputStream();
public PipedInputStream getPipedInputStream() {
return pipedInputStream;
}
@Override
public void run() {
readMessage();
}
void readMessage() {
try {
byte[] bytes = new byte[1024];
int len = 0;
while ((len = pipedInputStream.read(bytes)) != -1) {
System.out.println(new String(bytes, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}