Java线程操作实现生产者与消费者模型

基本模型
class Message {  //描述公共空间
    private String title;
    private String content;

    public void setContent(String content) {
        this.content = content;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getContent() {
        return content;
    }

    public String getTitle() {
        return title;
    }

}

class ProducerThread implements Runnable {
    private final Message message;    //获得message的引用

    public ProducerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            this.message.setTitle("title: " + i);
            this.message.setContent("content: " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

class ConsumerThread implements Runnable {

    private final Message message;    //获得message的引用

    public ConsumerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run(){

        for (int i = 0; i < 100; i++) {
            System.out.println("消费者:" + this.message.getTitle() + this.message.getContent());

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

public class Test {

    public static void main(String[] args) {

        Message message = new Message();        //实例化Message
        new Thread(
                new ProducerThread(message)
        ).start();                  //启动生产者线程

        new Thread(
                new ConsumerThread(message)
        ).start();                  // 启动消费者线程

    }

}

此时的代码仅仅是是实现了两个线程彼此独立的操作交互,但是通过最终的执行结果可以清楚的发现当前的程序代码之中存在如下的几个设计问题:

  • 在生产着没有完成向生产的时候哦虎消费者就已经获取数据了,获取的数据就属于一个半成品数据;
  • 如果说现在消费者性能高于生产者,则会出现爱你同一个信息重复消费的问题。

解决数据同步问题

在java的程序之中,如果想要实现数据的同步处理,那么肯定要使用synchronized来完成,如果要完成的话那么肯定就需要同步代码块或者是同步方法,之所以现在出现了不同步的问题,主要是没有在生产的过程里面对数据的操作进行锁定。


image.png

范例:修改程序的结构代码

class Message {  //描述公共空间
    private String title;
    private String content;


    public synchronized void set(String title,String content){
        this.content = content;
        this.title = title;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized  String get(){
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return  this.title + this.content;

    }


}

class ProducerThread implements Runnable {
    private final Message message;    //获得message的引用

    public ProducerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            this.message.set(
                    "title: " + i,"content: "+i
            );
        }
    }

}

class ConsumerThread implements Runnable {

    private final Message message;    //获得message的引用

    public ConsumerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run(){

        for (int i = 0; i < 100; i++) {
            System.out.println(this.message.get());
        }

    }
}

public class Test {

    public static void main(String[] args) {

        Message message = new Message();        //实例化Message
        new Thread(
                new ProducerThread(message)
        ).start();                  //启动生产者线程

        new Thread(
                new ConsumerThread(message)
        ).start();                  // 启动消费者线程

    }

}

通过当前的程序的改进可以发现此时的代码已经完全可以正确的实现相关的内容的配置,并且没有任何的数据错位,所以解决了数据混乱的问题,但是另外一个问题还没有解决:重复生产数据和重复消费数据。

解决线程重复操作问题

而要想实现这样的功能就必须采用线程的同步、等待与唤醒的处理机制,而这样的处理机制的操作全部定义在了Object类里面,在Object类中提供有如下的几个与线程有关的操作方法。

image.png

特别重要的提示:以上这几个处理方法是进行线程控制的,但是这些方法都是在synchronized方法中才去使用的,同时这些方法属于最原始的多线程协作控制。如果真的用这些操作进行开发,那么整个项目里面基本上对死锁的情况就会频发。

class Message {  //描述公共空间
    private String title;
    private String content;
    private boolean flag = true;    //设置一个标志位(红绿灯)

    //flag = true:表示可以生产,但是无法进行消费,
    //flag = false:表示可以消费,但是无法生产

    public synchronized void set(String title,String content){
        if (this.flag == false) {   //不允许生产,但是允许消费
            try {
                super.wait();   //等到消费者线程执行完毕后唤醒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        this.content = content;
        this.title = title;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.flag = false;      //表示生产完成,可以消费
        super.notify();         //唤醒其它等待线程
    }

    public synchronized  String get(){

        if(this.flag == true){  //不允许消费,只允许生产
            try {
                super.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        this.flag = true;      //表示消费完成,可以生产
        super.notify();         //唤醒其它等待线程

        return  this.title + this.content;
    }


}

class ProducerThread implements Runnable {
    private final Message message;    //获得message的引用

    public ProducerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            this.message.set(
                    "title: " + i,"content: "+i
            );
        }
    }

}

class ConsumerThread implements Runnable {

    private final Message message;    //获得message的引用

    public ConsumerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run(){

        for (int i = 0; i < 100; i++) {
            System.out.println(this.message.get());
        }

    }
}

public class Test {

    public static void main(String[] args) {

        Message message = new Message();        //实例化Message
        new Thread(
                new ProducerThread(message)
        ).start();                  //启动生产者线程

        new Thread(
                new ConsumerThread(message)
        ).start();                  // 启动消费者线程

    }

}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352