一、java管道流介绍
在java多线程通信中管道通信是一种重要的通信方式,在java中我们通过配套使用管道输出流PipedOutputStream和管道输入流PipedInputStream完成线程间通信。多线程管道通信的主要流程是在一个线程中向PipedOutputStream写入数据,这些数据会自动传送到对应的管道输入流PipedInputStream中,其他线程通过读取PipeInputStream中缓冲的数据实现多线程间通信。
二、PipedInputStream
1 - PipedInputStream介绍
PipeInputStream是管道输入流,继承自InputStream,连接到一个管道输出流PipedOutputStream。可以缓存连接的管道输出流PipedOutputStream写入的字节数据。通常在一个线程使用PipedInputStream读取数据,在其他线程使用PipedOutputStream写入字节数据。不推荐在一个线程中使用PipedInputStream和PipedOutputStream可能会在线程中造成死锁,管道输入流包含一个缓冲区buff用于分离读操作和写操作。
2 - PipeInputStream源码分析
1)成员变量
package java.io;
public class PipedInputStream extends InputStream {
//管道输出流是否关闭标记
boolean closedByWriter = false;
//管道输入流是否标记
volatile boolean closedByReader = false;
//管道输入流与管道输出流是否建立连接
boolean connected = false;
//读取“管道”数据即PipedInputStream线程
Thread readSide;
//向管道写入数据即PipedOutputStream线程
Thread writeSide;
//管道默认大小
private static final int DEFAULT_PIPE_SIZE = 1024;
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
// 缓冲区
protected byte buffer[];
//下一个写入字节的位置
protected int in = -1;
//下一个读取字节的位置。若out==in说明管道输出流写入的数据全部被读取
protected int out = 0;
}
2)构造方法
public PipedInputStream(PipedOutputStream src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}
public PipedInputStream(PipedOutputStream src, int pipeSize)
throws IOException {
initPipe(pipeSize);
connect(src);
}
public PipedInputStream() {
initPipe(DEFAULT_PIPE_SIZE);
}
从源码我们可以知道管道输入流PipedInputStream构造方法做了两件事,按照指定大小pipeSize初始化缓冲区,如果还指定了关联的管道输出流PipedOutputStream,那么调用connect方法连接它。如果指定的pipeSize小于等于0那么抛出IllegalArgumentException异常,如果当前的管道输入流已经和指定的管道输出流建立连接那么抛出IOException异常
3)其他成员方法
//根据指定大小pipeSize初始化缓冲区
private void initPipe(int pipeSize) {
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe Size <= 0");
}
buffer = new byte[pipeSize];
}
//绑定管道输入流与管道输出流
public void connect(PipedOutputStream src) throws IOException {
src.connect(this);
}
//接收绑定的管道输出流PipedOutputStream的write(int b)方法写入的int类型数据
protected synchronized void receive(int b) throws IOException {
//检查管道状态
checkStateForReceive();
//获取“写入管道“即PipedOutputStream的线程
writeSide = Thread.currentThread();
//若管道输出流写入的数据全部被读取则等待
if (in == out)
awaitSpace();
if (in < 0) {
in = 0;
out = 0;
}
//将读取的字节b保存到缓冲区
buffer[in++] = (byte)(b & 0xFF);
//如果当前写入的字节数大于缓冲区大小那么从头覆盖之前写入的字节数据
if (in >= buffer.length) {
in = 0;
}
}
//接收管道输出流的write(byte b[],int off, int len)方法调用写入的字节数组b
synchronized void receive(byte b[], int off, int len) throws IOException {
//检查管道状态
checkStateForReceive();
//获取”写入管道“线程
writeSide = Thread.currentThread();
//获取写入字节长度
int bytesToTransfer = len;
//循环将字节数组b写入管道输入流内部缓冲数组buffer
while (bytesToTransfer > 0) {
//若写入管道的字节长度in等于读取字节长度长度out,则等待
if (in == out)
awaitSpace();
int nextTransferAmount = 0;
//若管道中读取的字节数out小于写入的字节数in,nextTransferAmount等于buffer.length-in
if (out < in) {
nextTransferAmount = buffer.length - in;
}
//若管道中写入的字节数小于读取的字节数,
else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
//将数据复制到缓冲区buffer中
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
//缓冲区溢出继续写入则覆盖原有字节数据
if (in >= buffer.length) {
in = 0;
}
}
}
//检查管道状态
private void checkStateForReceive() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}
}
//等待。若”写入管道“的数据被全部读取完,则唤醒”读取管道“线程继续读取字节数据以让缓冲区空出空间继续写入数据,等待
//1000ms
private void awaitSpace() throws IOException {
while (in == out) {
checkStateForReceive();
/* full: kick any waiting readers */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
//连接的管道输出流关闭时被调用用于更新管道输入流关闭状态,并唤醒读取管道线程
synchronized void receivedLast() {
closedByWriter = true;
notifyAll();
}
//从管道输入流更确切的说缓冲区buffer中读取一个字节并转化为int值
public synchronized int read() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = 2;
//如果写入字节数小于0,且非写入管道线程异常终止或者管道输出流写入结束正常关闭,那么唤醒写入管道等待1s
while (in < 0) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
//从管道输入流缓冲数组buffer中读取字节数据并填入数组b中逻辑与read()类似
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
b[off] = (byte) c;
int rlen = 1;
while ((in >= 0) && (len > 1)) {
int available;
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
// A byte is read beforehand outside the loop
if (available > (len - 1)) {
available = len - 1;
}
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;
rlen += available;
len -= available;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
}
//返回可不受阻塞地从此输入流中读取的字节数,注意available返回是缓冲数组中的可读字节长度
public synchronized int available() throws IOException {
if(in < 0)
return 0;
else if(in == out)
return buffer.length;
else if (in > out)
return in - out;
else
return in + buffer.length - out;
}
//关闭输出流
public void close() throws IOException {
closedByReader = true;
synchronized (this) {
in = -1;
}
}
老韭菜 
![[爱了]](/js/img/d1.gif)
![[尴尬]](/js/img/d16.gif)