由生产者消费者问题看JAVA多线程

Python021

由生产者消费者问题看JAVA多线程,第1张

生产者消费者问题是研究多线程程序时绕不开的问题 它的描述是有一块生产者和消费者共享的有界缓冲区 生产者往缓冲区放入产品 消费者从缓冲区取走产品 这个过程可以无休止的执行 不能因缓冲区满生产者放不进产品而终止 也不能因缓冲区空消费者无产品可取而终止

解决生产者消费者问题的方法有两种 一种是采用某种机制保持生产者和消费者之间的同步 一种是在生产者和消费者之间建立一个管道 前一种有较高的效率并且可控制性较好 比较常用 后一种由于管道缓冲区不易控制及被传输数据对象不易封装等原因 比较少用

同步问题的核心在于 CPU是按时间片轮询的方式执行程序 我们无法知道某一个线程是否被执行 是否被抢占 是否结束等 因此生产者完全可能当缓冲区已满的时候还在放入产品 消费者也完全可能当缓冲区为空时还在取出产品

现在同步问题的解决方法一般是采用信号或者加锁机制 即生产者线程当缓冲区已满时放弃自己的执行权 进入等待状态 并通知消费者线程执行 消费者线程当缓冲区已空时放弃自己的执行权 进入等待状态 并通知生产者线程执行 这样一来就保持了线程的同步 并避免了线程间互相等待而进入死锁状态

JAVA语言提供了独立于平台的线程机制 保持了 write once run anywhere 的特色 同时也提供了对同步机制的良好支持

在JAVA中 一共有四种方法支持同步 其中三个是同步方法 一个是管道方法

方法wait()/notify()

方法await()/signal()

阻塞队列方法BlockingQueue

管道方法PipedInputStream/PipedOutputStream

下面我们看各个方法的实现

方法wait()/notify()

wait()和notify()是根类Object的两个方法 也就意味着所有的JAVA类都会具有这个两个方法 为什么会被这样设计呢?我们可以认为所有的对象默认都具有一个锁 虽然我们看不到 也没有办法直接操作 但它是存在的

wait()方法表示 当缓冲区已满或空时 生产者或消费者线程停止自己的执行 放弃锁 使自己处于等待状态 让另一个线程开始执行

notify()方法表示 当生产者或消费者对缓冲区放入或取出一个产品时 向另一个线程发出可执行通知 同时放弃锁 使自己处于等待状态

下面是一个例子代码

import java util LinkedList

public class Sycn {

private LinkedList<Object>myList =new LinkedList<Object>()

private int MAX =

public Sycn (){

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run(){

while(true){

synchronized(myList){

try{

while(myList size() == MAX){

System out println( warning: it s full! )

myList wait()

}

Object o = new Object()

if(myList add(o)){

System out println( Producer: + o)

myList notify()

}

}catch(InterruptedException ie){

System out println( producer is interrupted! )

}

}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

synchronized(myList){

try{

while(myList size() == ){

System out println( warning: it s empty! )

myList wait()

}

Object o = myList removeLast()

System out println( Consumer: + o)

myList notify()

}catch(InterruptedException ie){

System out println( consumer is interrupted! )

}

}

}

}

}

}

方法await()/signal()

在JDK 以后 JAVA提供了新的更加健壮的线程处理机制 包括了同步 锁定 线程池等等 它们可以实现更小粒度上的控制 await()和signal()就是其中用来做同步的两种方法 它们的功能基本上和wait()/notify()相同 完全可以取代它们 但是它们和新引入的锁定机制Lock直接挂钩 具有更大的灵活性

下面是一个例子代码

import java util LinkedList

import ncurrent locks *

public class Sycn {

private LinkedList<Object>myList = new LinkedList<Object>()

private int MAX =

private final Lock lock = new ReentrantLock()

private final Condition full = lock newCondition()

private final Condition empty = lock newCondition()

public Sycn (){

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run(){

while(true){

lock lock()

try{

while(myList size() == MAX){

System out println( warning: it s full! )

full await()

}

Object o = new Object()

if(myList add(o)){

System out println( Producer: + o)

empty signal()

}

}catch(InterruptedException ie){

System out println( producer is interrupted! )

}finally{

lock unlock()

}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

lock lock()

try{

while(myList size() == ){

System out println( warning: it s empty! )

empty await()

}

Object o = myList removeLast()

System out println( Consumer: + o)

full signal()

}catch(InterruptedException ie){

System out println( consumer is interrupted! )

}finally{

lock unlock()

}

}

}

}

}

阻塞队列方法BlockingQueue

BlockingQueue也是JDK 的一部分 它是一个已经在内部实现了同步的队列 实现方式采用的是我们的第 种await()/signal()方法 它可以在生成对象时指定容量大小

它用于阻塞操作的是put()和take()方法

put()方法类似于我们上面的生产者线程 容量最大时 自动阻塞

take()方法类似于我们上面的消费者线程 容量为 时 自动阻塞

下面是一个例子代码

import ncurrent *

public class Sycn {

private LinkedBlockingQueue<Object>queue = new LinkedBlockingQueue<Object>( )

private int MAX =

public Sycn (){

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run(){

while(true){

//synchronized(this){

try{

if(queue size() == MAX)

System out println( warning: it s full! )

Object o = new Object()

queue put(o)

System out println( Producer: + o)

}catch(InterruptedException e){

System out println( producer is interrupted! )

}

//}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

//synchronized(this){

try{

if(queue size() == )

System out println( warning: it s empty! )

Object o = queue take()

System out println( Consumer: + o)

}catch(InterruptedException e){

System out println( producer is interrupted! )

}

//}

}

}

}

}

你发现这个例子中的问题了吗?

如果没有 我建议你运行一下这段代码 仔细观察它的输出 是不是有下面这个样子的?为什么会这样呢?

warning: it s full!

Producer: java lang object@ e a

你可能会说这是因为put()和System out println()之间没有同步造成的 我也这样认为 我也这样认为 但是你把run()中的synchronized前面的注释去掉 重新编译运行 有改观吗?没有 为什么?

这是因为 当缓冲区已满 生产者在put()操作时 put()内部调用了await()方法 放弃了线程的执行 然后消费者线程执行 调用take()方法 take()内部调用了signal()方法 通知生产者线程可以执行 致使在消费者的println()还没运行的情况下生产者的println()先被执行 所以有了上面的输出 run()中的synchronized其实并没有起什么作用

对于BlockingQueue大家可以放心使用 这可不是它的问题 只是在它和别的对象之间的同步有问题

对于这种多重嵌套同步的问题 以后再谈吧 欢迎大家讨论啊!

管道方法PipedInputStream/PipedOutputStream

这个类位于java io包中 是解决同步问题的最简单的办法 一个线程将数据写入管道 另一个线程从管道读取数据 这样便构成了一种生产者/消费者的缓冲区编程模式

下面是一个例子代码 在这个代码我没有使用Object对象 而是简单的读写字节值 这是因为PipedInputStream/PipedOutputStream不允许传输对象 这是JAVA本身的一个bug 具体的大家可以看sun的解释 _bug do?bug_id=

import java io *

public class Sycn {

private PipedOutputStream pos

private PipedInputStream pis

//private ObjectOutputStream oos

//private ObjectInputStream ois

public Sycn (){

try{

pos = new PipedOutputStream()

pis = new PipedInputStream(pos)

//oos = new ObjectOutputStream(pos)

//ois = new ObjectInputStream(pis)

}catch(IOException e){

System out println(e)

}

}

public void start(){

new Producer() start()

new Consumer() start()

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ()

s start()

}

class Producer extends Thread{

public void run() {

try{

while(true){

int b = (int) (Math random() * )

System out println( Producer: a byte the value is + b)

pos write(b)

pos flush()

//Object o = new MyObject()

//oos writeObject(o)

//oos flush()

//System out println( Producer: + o)

}

}catch(Exception e){

//System out println(e)

e printStackTrace()

}finally{

try{

pos close()

pis close()

//oos close()

//ois close()

}catch(IOException e){

System out println(e)

}

}

}

}

class Consumer extends Thread{

public void run(){

try{

while(true){

int b = pis read()

System out println( Consumer: a byte the value is + String valueOf(b))

//Object o = ois readObject()

//if(o != null)

//System out println( Consumer: + o)

}

}catch(Exception e){

//System out println(e)

e printStackTrace()

}finally{

try{

pos close()

pis close()

//oos close()

//ois close()

}catch(IOException e){

System out println(e)

}

}

}

}

//class MyObject implements Serializable {

//}

lishixinzhi/Article/program/Java/gj/201311/27617

Java多线程:是程序中的一个单一的连续控制流程,一个线程可以拥有多个线程 。记得刚学习Java多线程的时候,对线程中的run()不知道是什么意思,现在大胆认为它就像Java的main()一样,可以理解为一个线程启动运行的入口函数。

创建一个线程的方式有两种,一种是继承Thread类,还有就是实现Runnable 接口,两者都要重写run。

时间偏分给我们想要执行的线程时,可以将该线程的优先级设置Thread.MAX_PRIORITY .下面是一个生产者和消费者的多线程的例子:其规则很简单,只有生产出来东西才能有东西来消费。

Java多线程知识点:线程的创建、线程的同步、顺便回顾一下大学的操作系统。

class Test {

public static void main(String[] args) {

Queue q = new Queue()

Producer p = new Producer(q)

Consumer c = new Consumer(q)

p.start()

c.start()

}

}

class Producer extends Thread {

Queue q

Producer(Queue q) {

this.q = q

}

public void run() {

for (int i = 0i <10i++) {

q.put(i)

System.out.println("Producer put " + i)

}

}

}

class Consumer extends Thread {

Queue q

Consumer(Queue q) {

this.q = q

}

public void run() {

while (true) {

System.out.println("Consumer get " + q.get())

}

}

}

class Queue {

int value

boolean bFull = false

public synchronized void put(int i) {

if (!bFull) {

value = i

bFull = true

notify()

}

try {

wait()

} catch (Exception e) {

e.printStackTrace()

}

}

public synchronized int get() {

if (!bFull) {

try {

wait()

} catch (Exception e) {

e.printStackTrace()

}

}

bFull = false

notify()

return value

}

}

生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。

解决生产者/消费者问题的方法可分为两类:

采用某种机制保护生产者和消费者之间的同步;

在生产者和消费者之间建立一个管道。

第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

在Java中有四种方法支持同步,其中前三个是同步方法,一个是管道方法。

wait()

/

notify()方法

await()

/

signal()方法

BlockingQueue阻塞队列方法

PipedInputStream

/

PipedOutputStream

通过

wait()

/

notify()方法实现:

wait()

/

nofity()方法是基类Object的两个方法:

wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。

notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

通过await()

/

signal()方法实现:

await()和signal()的功能基本上和wait()

/

nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

通过BlockingQueue方法实现:

它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()

/

signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。

take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。