经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » 设计模式 » 查看文章
Java多线程并发生产者消费者设计模式实例解析
来源:jb51  时间:2020/3/24 13:29:36  对本文有异议

一、两个线程一个生产者一个消费者

需求情景

两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个。

涉及问题

  • 同步问题:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用标记或加锁机制。
  • wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
  • wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
  • notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码实现(共三个类和一个main方法的测试类)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    if (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

Producer.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer implements Runnable{

  private Resource resource;

  public Producer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.create();
    }

  }
}

Consumer.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer implements Runnable{

  private Resource resource;

  public Consumer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.destroy();
    }

  }
}

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

打印结果:

以上打印结果可以看出没有任何问题。

二、多个线程,多个生产者和多个消费者的问题

需求情景

四个线程,两个个负责生产,两个个负责消费,生产者生产一个,消费者消费一个。

涉及问题

notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的所有线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

再次测试代码

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

运行结果:

通过以上打印结果发现问题

147生产了一次,消费了两次。169生产了,而没有消费。

原因分析

当两个线程同时操作生产者生产或者消费者消费时,如果有生产者或消费者的两个线程都wait()时,再次notify(),由于其中一个线程已经改变了标记而另外一个线程再次往下直接执行的时候没有判断标记而导致的。if判断标记,只有一次,会导致不该运行的线程运行了。出现了数据错误的情况。

解决方案

while判断标记,解决了线程获取执行权后,是否要运行!也就是每次wait()后再notify()时先再次判断标记。

代码改进(Resource中的 if -> while)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

运行结果:

再次发现问题

打印到某个值比如生产完187,程序运行卡死了,好像锁死了一样。

原因分析

notify:只能唤醒一个线程,如果本方唤醒了本方,没有意义。而且while判断标记+notify会导致”死锁”。

解决方案

notifyAll解决了本方线程一定会唤醒对方线程的问题。

最后代码改进(Resource中的 notify() -> notifyAll())

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notifyAll();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notifyAll();
  }
}

运行结果:

以上就大功告成了,没有任何问题。

再来梳理一下整个流程。按照示例,生产者消费者交替运行,每次生产后都有对应的消费者,测试类创建实例,如果是生产者先运行,进入run()方法,进入create()方法,flag默认为false,number+1,生产者生产一个产品,flag置为true,同时调用notifyAll()方法,唤醒所有正在等待的线程,接下来如果还是生产者运行呢?这是flag为true,进入while循环,执行wait()方法,接下来如果是消费者运行的话,调用destroy()方法,这时flag为true,消费者购买了一次产品,随即将flag置为false,并唤醒所有正在等待的线程。这就是一次完整的多生产者对应多消费者的问题。

三、使用Lock和Condition来解决生产者消费者问题

上面的代码有一个问题,就是我们为了避免所有的线程都处于等待的状态,使用了notifyAll方法来唤醒所有的线程,即notifyAll唤醒的是自己方和对方线程。如果我需要只是唤醒对方的线程,比如:生产者只能唤醒消费者的线程,消费者只能唤醒生产者的线程。

在jdk1.5当中为我们提供了多线程的升级解决方案:

1. 将同步synchronized替换成了Lock操作。

2. 将Object中的wait,notify,notifyAll方法替换成了Condition对象。

3. 可以只唤醒对方的线程。

完整代码:

Resource1.java

package com.demo.ProducerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource1 {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;
  
  private Lock lock = new ReentrantLock();
  //使用lock建立生产者的condition对象
  private Condition condition_pro = lock.newCondition(); 
  //使用lock建立消费者的condition对象
  private Condition condition_con = lock.newCondition(); 


  /**
   * 生产资源
   */
  public void create() throws InterruptedException {
    
    try{
      lock.lock();
      //先判断标记是否已经生产了,如果已经生产,等待消费
      while(flag){
        //生产者等待
        condition_pro.await();
      }
      //生产一个
      number++;
      System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
      //将资源标记为已经生产
      flag = true;
      //生产者生产完毕后,唤醒消费者的线程(注意这里不是signalAll)
      condition_con.signal();
    }finally{
      lock.unlock();
    }
  }

  /**
   * 消费资源
   */
  public void destroy() throws InterruptedException{

    try{
      lock.lock();
      //先判断标记是否已经消费了,如果已经消费,等待生产
      while(!flag){
        //消费者等待
        condition_con.await();
      }
      
      System.out.println(Thread.currentThread().getName() + "消费者****" + number);
      //将资源标记为已经消费
      flag = false;
      //消费者消费完毕后,唤醒生产者的线程
      condition_pro.signal();
    }finally{
      lock.unlock();
    }
  }
}

Producer1.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer1 implements Runnable{

  private Resource1 resource;

  public Producer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.create();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

Consumer1.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer1 implements Runnable{

  private Resource1 resource;

  public Consumer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.destroy();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

ProducerConsumerTest1.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest1 {

  public static void main(String args[]) {
    Resource1 resource = new Resource1();
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Consumer1(resource)).start();//消费者线程
    new Thread(new Consumer1(resource)).start();//消费者线程

  }
}

运行结果:

四、总结

1、如果生产者、消费者都是1个,那么flag标记可以用if判断。这里有多个,必须用while判断。

2、在while判断的同时,notify函数可能唤醒本类线程(如一个消费者唤醒另一个消费者),这会导致所有消费者忙等待,程序无法继续往下执行。使用notifyAll函数代替notify可以解决这个问题,notifyAll可以保证非本类线程被唤醒(消费者线程能唤醒生产者线程,反之也可以),解决了忙等待问题。

小心假死

生产者/消费者模型最终达到的目的是平衡生产者和消费者的处理能力,达到这个目的的过程中,并不要求只有一个生产者和一个消费者。可以多个生产者对应多个消费者,可以一个生产者对应一个消费者,可以多个生产者对应一个消费者。

假死就发生在上面三种场景下。假死指的是全部线程都进入了WAITING状态,那么程序就不再执行任何业务功能了,整个项目呈现停滞状态。

比方说有生产者A和生产者B,缓冲区由于空了,消费者处于WAITING。生产者B处于WAITING,生产者A被消费者通知生产,生产者A生产出来的产品本应该通知消费者,结果通知了生产者B,生产者B被唤醒,发现缓冲区满了,于是继续WAITING。至此,两个生产者线程处于WAITING,消费者处于WAITING,系统假死。

上面的分析可以看出,假死出现的原因是因为notify的是同类,所以非单生产者/单消费者的场景,可以采取两种方法解决这个问题:

(1)synchronized用notifyAll()唤醒所有线程、ReentrantLock用signalAll()唤醒所有线程。

(2)用ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用相应的Condition的signal()方法就可以了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持w3xue。

 友情链接: NPS