wait(),notify() ,线程的中断,管道,管道泄漏 (?byte[] circular buffer)

首先线程基础:

public class MultiThread{
    public static void main(String[] args) {
 "获取Java线程管理MXBean"
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
"不需要获取同步的monitor和synchronizer信息,仅获取线程和线程堆栈信息"
        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
"遍历线程信息,仅打印线程ID和线程名称信息"
        for (ThreadInfo threadInfo : threadInfos) {
            System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.
                    getThreadName());
        }
    }
}

输出结果:

[6] Monitor Ctrl-Break
[5] Attach Listener
[4] Signal Dispatcher
[3] Finalizer
[2] Reference Handler
[1] main

首先,结论是在运行main方法的时候,并不是一个单独的线程,Attach Listener Signal Dispatcher,然后这两个线程,是和jvm的attach机制相关,Finalizer Reference Handler 这两个是和垃圾回收机制有关,不可达对象要被垃圾回收,至少要经历两次标记过程。第一次标记时执行finalize()方法,并做记号,第二次标记则不会再执行finalize()方法了。

线程状态

public class ThreadState {
    public static void main(String[] args) {
        new Thread(new TimeWaiting (), "TimeWaitingThread").start();
        new Thread(new Waiting(), "WaitingThread").start();
"使用两个Blocked线程,一个获取锁成功,另一个被阻塞"
        new Thread(new Blocked(), "BlockedThread-1").start();
        new Thread(new Blocked(), "BlockedThread-2").start();
    }
         "该线程不断地进行睡眠"
    static class TimeWaiting implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
          "该线程在Waiting.class实例上等待"
    static class Waiting implements Runnable {@Override
    public void run() {
        while (true) {
            synchronized (Waiting.class) {
                try {
                    Waiting.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    }
          "该线程在Blocked.class实例上加锁后,不会释放该锁"
    static class Blocked implements Runnable {
        public void run() {
            synchronized (Blocked.class) {
                while (true) {
                    try {
                        Thread.sleep(100000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

image.png

image.png

线程的通信

  • 一种是通过共享变量,然后为了解决多线程数据一致性问题 ,加上JMM。比如volatile,synchronized 关键字。
  • wait(),notify(),notifyAll() .Thread.join(),Thread.yield()等jdk自带的api . 这个就不说了,但是强调的一点是只有获取锁之后,才能使用 wait(),方法,因为wait() 方法的含义就是释放锁,然后放弃cpu的调度,进入WAITING状态,然后其他线程调用notify()的时候,也不一定是唤醒他这个线程,假如说有一个等待队列的话,应该是等待队列的头部,重新进入ready状态,参与cpu时间调度算法。如果是非公平的,那么也不一定是头部。
  • 管道(线程之间传递数据,一般不是用字节流,而是对象,少用)
  • ThreadLocal (这里涉及到强引用,弱引用等,放在下一篇讲)
    接下来讲讲管道,字节流的PipedOutputStream,PipedInputStream,还有字符流的PipedReader和PipedWriter。

PipedOutputStream,PipedInputStream

管道流向流程图:
PipedOutputStream从内存输出数据写入到PipedInputStream的缓冲区,PipedInputStream从PipedInputStream缓冲区读取管道流数据。


image.png

截取一段源码。

public class PipedInputStream extends InputStream {
    boolean closedByWriter = false;
    volatile boolean closedByReader = false;
    boolean connected = false;

        "REMIND: identification of the read and write sides needs to be
           more sophisticated.  Either using thread groups (but what about
           pipes within a thread?) or using finalization (but it may be a
           long time until the next GC). "
    Thread readSide;
    Thread writeSide;

    private static final int DEFAULT_PIPE_SIZE = 1024;

   "
     * The default size of the pipe's circular input buffer.
     * @since   JDK1.1
     */
    // This used to be a constant before the pipe size was allowed
    // to change. This field will continue to be maintained
    // for backward compatibility.
    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    /**
     * The circular buffer into which incoming data is placed.
     * @since   JDK1.1
    "
    protected byte buffer[];

connect 方法是同步方法synchronized ,看不懂???,这里好像是涉及到和disruptor框架核心类似的ringBuffer .先搁在这里,我们后续再研究。 在ThreadLocal后一篇,我们补上ringBuffer,然后研究这里的circularBuffer .

public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        sink = snk;
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;
    }
protected synchronized void receive(int b) throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        if (in == out)
            awaitSpace();
        if (in < 0) {
            in = 0;
            out = 0;
        }
        buffer[in++] = (byte)(b & 0xFF);
        if (in >= buffer.length) {
            in = 0;
        }
    }

查看PipedOutputStream的源码我们发现,PipedOutputStream本身没有缓冲区,1024的缓冲区在输入流中。PipedOutputStream的写入方法
public void write(byte b[], int off, int len) flush() 都是调用的PipedInputStream的
synchronized void receive(byte b[], int off, int len) 方法
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
最后我们上代码:

public class MultiThreadPipedTest {
    public static class Write extends Thread{
        public PipedOutputStream pos = null;

        "获取线程中的管道输出流"
        public PipedOutputStream getPos(){
            pos = new PipedOutputStream();
            return pos;
        }
       "把数据通过管道输出流发送出去"
        public void SentData(){
            PrintStream p = new PrintStream(pos);
            for(int i=1;i<10;i++){
                p.println("hello");
                p.flush();
            }
            p.close();
        }
        @Override
        public void run(){
            while(true);         //模拟耗时工作
        }
    }

    public static class Read extends Thread{
        public PipedInputStream pis = null;
        public String line = "null";

        "获得线程中的管道输入流"
        public PipedInputStream getPis(){
            pis = new PipedInputStream();
            return pis;
        }
        "利用管道输入流接收管道数据"
        public void ReceiveData(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("read: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run(){
            while(true);        //模拟耗时工作
        }
    }

    public static class Other_Thread extends Thread{
        public PipedInputStream pis = null;
        public String line = "null";

        "获得线程中的管道输入流"
        public PipedInputStream getPis(){
            pis = new PipedInputStream();
            return pis;
        }
       "利用管道输入流接收管道数据"
        public void ReceiveData(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("Other thread: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run(){
            while(true);   //模拟耗时操作
        }
    }
    public static void main(String args[]) throws InterruptedException, IOException{
        Write write = new Write();
        Read read = new Read();
        Other_Thread other = new Other_Thread();
        "连接两个线程的管道流 ---read和write线程"
        write.getPos().connect(read.getPis());
        write.start();
        read.start();
        other.start();
        write.SentData();
        read.ReceiveData();
        Thread.sleep(2000);
        "重新连接两个线程的管道流 ---Other_Thread和write线程"
        write.getPos().connect(other.getPis());
        write.SentData();
        other.ReceiveData();
    }
}
image.png

管道泄漏 ,一个线程写,多个线程读取数据,本来connect应该是1对1 的。但是下面的例子,write和read本来是一对的,但是Other_Thread 窃取了,read线程的数据。

public class PipedStreamLeakTest {
    public static class Write extends Thread{
        public PipedOutputStream pos;
        Write(PipedOutputStream pos){
            this.pos = pos;
        }
        public void run(){
            PrintStream p = new PrintStream(pos);
            for(int i=1;i<1000;i++){
                p.println("hello");
                p.flush();
            }
            p.close();
        }
    }

    public static class Read extends Thread{
        public PipedInputStream pis;
        public String line = "null";
        Read(PipedInputStream pis){
            this.pis = pis;
        }
        public void run(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("read: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static class Other_Thread extends Thread{
        public PipedInputStream pis;
        public String line = "null";
        Other_Thread(PipedInputStream pis){
            this.pis = pis;
        }
        public void run(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("Other_Thread: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String args[]) throws InterruptedException, IOException{
        "创建管道通信流"
        PipedOutputStream pos = new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream(pos);
        new Write(pos).start();
        new Read(pis).start();
        new Other_Thread(pis).start();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容

  • 本文主要讲了java中多线程的使用方法、线程同步、线程数据传递、线程状态及相应的一些线程函数用法、概述等。在这之前...
    4ea0af17fd67阅读 589评论 2 17
  • 场景 假设我们需要上传一组动态增加的数据, 输入端可以看作inputSteam, 输入端是outputSteam,...
    AssIstne阅读 3,227评论 1 6
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,255评论 11 349
  • 概述 管道流是用来在多个线程之间进行信息传递的Java流。管道流分为字节流管道流和字符管道流。字节管道流:Pipe...
    jijs阅读 7,956评论 0 3
  • 你认为的"伟大软件"是什么? 保持低耦合,让你的程序代码因禁止修改而关闭, 因允许拓展而开放。重复利用。不必重做每...
    JocobZling阅读 461评论 0 1