由生产者消费者问题看JAVA多线程

Python028

由生产者消费者问题看JAVA多线程,第1张

生产者消费者问题是研究多线程程序时绕不开的问题 它的描述是有一块生产者和消费者共享的有界缓冲区 生产者往缓冲区放入产品 消费者从缓冲区取走产品 这个过程可以无休止的执行 不能因缓冲区满生产者放不进产品而终止 也不能因缓冲区空消费者无产品可取而终止

解决生产者消费者问题的方法有两种 一种是采用某种机制保持生产者和消费者之间的同步 一种是在生产者和消费者之间建立一个管道 前一种有较高的效率并且可控制性较好 比较常用 后一种由于管道缓冲区不易控制及被传输数据对象不易封装等原因 比较少用

同步问题的核心在于 CPU是按时间片轮询的方式执行程序 我们无法知道某一个线程是否被执行 是否被抢占 是否结束等 因此生产者完全可能当缓冲区已满的时候还在放入产品 消费者也完全可能当缓冲区为空时还在取出产品

现在同步问题的解决方法一般是采用信号或者加锁机制 即生产者线程当缓冲区已满时放弃自己的执行权 进入等待状态 并通知消费者线程执行 消费者线程当缓冲区已空时放弃自己的执行权 进入等待状态 并通知生产者线程执行 这样一来就保持了线程的同步 并避免了线程间互相等待而进入死锁状态

JAVA语言提供了独立于平台的线程机制 保持了 write once run anywhere 的特色 同时也提供了对同步机制的良好支持

在JAVA中 一共有四种方法支持同步 其中三个是同步方法 一个是管道方法

方法wait()/notify()

方法await()/signal()

阻塞队列方法BlockingQueue

管道方法PipedInputStream/PipedOutputStream

下面我们看各个方法的实现

方法wait()/notify()

wait()和notify()是根类Object的两个方法 也就意味着所有的JAVA类都会具有这个两个方法 为什么会被这样设计呢?我们可以认为所有的对象默认都具有一个锁 虽然我们看不到 也没有办法直接操作 但它是存在的

wait()方法表示 当缓冲区已满或空时 生产者或消费者线程停止自己的执行 放弃锁 使自己处于等待状态 让另一个线程开始执行

notify()方法表示 当生产者或消费者对缓冲区放入或取出一个产品时 向另一个线程发出可执行通知 同时放弃锁 使自己处于等待状态

下面是一个例子代码

import java util LinkedList

public class Sycn {

private LinkedList<Object>myList =new LinkedList<Object>()

private int MAX =

public Sycn (){

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run(){

while(true){

synchronized(myList){

try{

while(myList size() == MAX){

System out println( warning: it s full! )

myList wait()

}

Object o = new Object()

if(myList add(o)){

System out println( Producer: + o)

myList notify()

}

}catch(InterruptedException ie){

System out println( producer is interrupted! )

}

}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

synchronized(myList){

try{

while(myList size() == ){

System out println( warning: it s empty! )

myList wait()

}

Object o = myList removeLast()

System out println( Consumer: + o)

myList notify()

}catch(InterruptedException ie){

System out println( consumer is interrupted! )

}

}

}

}

}

}

方法await()/signal()

在JDK 以后 JAVA提供了新的更加健壮的线程处理机制 包括了同步 锁定 线程池等等 它们可以实现更小粒度上的控制 await()和signal()就是其中用来做同步的两种方法 它们的功能基本上和wait()/notify()相同 完全可以取代它们 但是它们和新引入的锁定机制Lock直接挂钩 具有更大的灵活性

下面是一个例子代码

import java util LinkedList

import ncurrent locks *

public class Sycn {

private LinkedList<Object>myList = new LinkedList<Object>()

private int MAX =

private final Lock lock = new ReentrantLock()

private final Condition full = lock newCondition()

private final Condition empty = lock newCondition()

public Sycn (){

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run(){

while(true){

lock lock()

try{

while(myList size() == MAX){

System out println( warning: it s full! )

full await()

}

Object o = new Object()

if(myList add(o)){

System out println( Producer: + o)

empty signal()

}

}catch(InterruptedException ie){

System out println( producer is interrupted! )

}finally{

lock unlock()

}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

lock lock()

try{

while(myList size() == ){

System out println( warning: it s empty! )

empty await()

}

Object o = myList removeLast()

System out println( Consumer: + o)

full signal()

}catch(InterruptedException ie){

System out println( consumer is interrupted! )

}finally{

lock unlock()

}

}

}

}

}

阻塞队列方法BlockingQueue

BlockingQueue也是JDK 的一部分 它是一个已经在内部实现了同步的队列 实现方式采用的是我们的第 种await()/signal()方法 它可以在生成对象时指定容量大小

它用于阻塞操作的是put()和take()方法

put()方法类似于我们上面的生产者线程 容量最大时 自动阻塞

take()方法类似于我们上面的消费者线程 容量为 时 自动阻塞

下面是一个例子代码

import ncurrent *

public class Sycn {

private LinkedBlockingQueue<Object>queue = new LinkedBlockingQueue<Object>( )

private int MAX =

public Sycn (){

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run(){

while(true){

//synchronized(this){

try{

if(queue size() == MAX)

System out println( warning: it s full! )

Object o = new Object()

queue put(o)

System out println( Producer: + o)

}catch(InterruptedException e){

System out println( producer is interrupted! )

}

//}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

//synchronized(this){

try{

if(queue size() == )

System out println( warning: it s empty! )

Object o = queue take()

System out println( Consumer: + o)

}catch(InterruptedException e){

System out println( producer is interrupted! )

}

//}

}

}

}

}

你发现这个例子中的问题了吗?

如果没有 我建议你运行一下这段代码 仔细观察它的输出 是不是有下面这个样子的?为什么会这样呢?

warning: it s full!

Producer: java lang object@ e a

你可能会说这是因为put()和System out println()之间没有同步造成的 我也这样认为 我也这样认为 但是你把run()中的synchronized前面的注释去掉 重新编译运行 有改观吗?没有 为什么?

这是因为 当缓冲区已满 生产者在put()操作时 put()内部调用了await()方法 放弃了线程的执行 然后消费者线程执行 调用take()方法 take()内部调用了signal()方法 通知生产者线程可以执行 致使在消费者的println()还没运行的情况下生产者的println()先被执行 所以有了上面的输出 run()中的synchronized其实并没有起什么作用

对于BlockingQueue大家可以放心使用 这可不是它的问题 只是在它和别的对象之间的同步有问题

对于这种多重嵌套同步的问题 以后再谈吧 欢迎大家讨论啊!

管道方法PipedInputStream/PipedOutputStream

这个类位于java io包中 是解决同步问题的最简单的办法 一个线程将数据写入管道 另一个线程从管道读取数据 这样便构成了一种生产者/消费者的缓冲区编程模式

下面是一个例子代码 在这个代码我没有使用Object对象 而是简单的读写字节值 这是因为PipedInputStream/PipedOutputStream不允许传输对象 这是JAVA本身的一个bug 具体的大家可以看sun的解释 _bug do?bug_id=

import java io *

public class Sycn {

private PipedOutputStream pos

private PipedInputStream pis

//private ObjectOutputStream oos

//private ObjectInputStream ois

public Sycn (){

try{

pos = new PipedOutputStream()

pis = new PipedInputStream(pos)

//oos = new ObjectOutputStream(pos)

//ois = new ObjectInputStream(pis)

}catch(IOException e){

System out println(e)

}

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run() {

try{

while(true){

int b = (int) (Math random() * )

System out println( Producer: a byte the value is + b)

pos write(b)

pos flush()

//Object o = new MyObject()

//oos writeObject(o)

//oos flush()

//System out println( Producer: + o)

}

}catch(Exception e){

//System out println(e)

e printStackTrace()

}finally{

try{

pos close()

pis close()

//oos close()

//ois close()

}catch(IOException e){

System out println(e)

}

}

}

}

class Consumer extends Thread{

public void run(){

try{

while(true){

int b = pis read()

System out println( Consumer: a byte the value is + String valueOf(b))

//Object o = ois readObject()

//if(o != null)

//System out println( Consumer: + o)

}

}catch(Exception e){

//System out println(e)

e printStackTrace()

}finally{

try{

pos close()

pis close()

//oos close()

//ois close()

}catch(IOException e){

System out println(e)

}

}

}

}

//class MyObject implements Serializable {

//}

lishixinzhi/Article/program/Java/gj/201311/27617

生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。

解决生产者/消费者问题的方法可分为两类:

采用某种机制保护生产者和消费者之间的同步;

在生产者和消费者之间建立一个管道。

第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

在Java中有四种方法支持同步,其中前三个是同步方法,一个是管道方法。

wait()

/

notify()方法

await()

/

signal()方法

BlockingQueue阻塞队列方法

PipedInputStream

/

PipedOutputStream

通过

wait()

/

notify()方法实现:

wait()

/

nofity()方法是基类Object的两个方法:

wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。

notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

通过await()

/

signal()方法实现:

await()和signal()的功能基本上和wait()

/

nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

通过BlockingQueue方法实现:

它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()

/

signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。

take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

生产者-消费者模式是一种经典的多线程设计模式。它通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责处理生产者提交的任务。两者线程通过共享内存缓冲区进行通信。

生产者线程将任务提交到共享内存缓冲区,消费者线程并不直接与生产者线程通信,而是在共享内存缓冲区中获取任务,并进行处理。共享内存缓冲区是其核心组件,它负责生产者和消费者之间的通信,避免两种直接通信,生产者和消费者都不需要知道对方的存在。由于共享内存缓冲区的存在,它允许生产者和消费者在执行速度上存在时间差,无论哪一方的速度高于对方,都可以通过共享内存缓冲区得到缓解,保证系统正常运行。

生产者-消费者模式的主要角色及作用:

生产者:用于提交用户请求,提取用户任务,并装入内存缓冲区

消费者:在内存缓冲区中提取处理任务

内存缓冲区:缓存生产者提交的任务或数据,供消费者使用

任务:生产者向内存缓冲区提交的数据结构

Main:使用生产者和消费者的客户端

BlockingQueue充当了共享内存缓存区,用于维护任务或数据队列。PCData对象表示一个生产任务或数据。生产者和消费者对象均引用同一个BlockingQueue对象实例,生产者负责创建PCData对象,并将它加入到BlockingQueue中,消费者则从BlockingQueue中获取PCData。

--参考文献《实战Java高并发程序设计》