在JAVA中怎么实现消息队列

Python014

在JAVA中怎么实现消息队列,第1张

java中的消息队列

消息队列是线程间通讯的手段:

import java.util.*

public class MsgQueue{

   private Vector queue = null

   public MsgQueue(){

              queue = new   Vector()

   }

   public synchronized void send(Object o)

   {

      queue.addElement(o)

   }

   public synchronized Object recv()

{

     if(queue.size()==0)

        return null

     Object o = queue.firstElement()

     queue.removeElementAt(0)//or queue[0] = null can also work

     return o

}

}

因为java中是locked by object的所以添加synchronized 就可以用于线程同步锁定对象

可以作为多线程处理多任务的存放task的队列。他的client包括封装好的task类以及thread类

Java的多线程-线程间的通信2009-08-25 21:58

1. 线程的几种状态

线程有四种状态,任何一个线程肯定处于这四种状态中的一种:

1) 产生(New):线程对象已经产生,但尚未被启动,所以无法执行。如通过new产生了一个线程对象后没对它调用start()函数之前。

2) 可执行(Runnable):每个支持多线程的系统都有一个排程器,排程器会从线程池中选择一个线程并启动它。当一个线程处于可执行状态时,表示它可能正处于线程池中等待排排程器启动它;也可能它已正在执行。如执行了一个线程对象的start()方法后,线程就处于可执行状态,但显而易见的是此时线程不一定正在执行中。

3) 死亡(Dead):当一个线程正常结束,它便处于死亡状态。如一个线程的run()函数执行完毕后线程就进入死亡状态。

4) 停滞(Blocked):当一个线程处于停滞状态时,系统排程器就会忽略它,不对它进行排程。当处于停滞状态的线程重新回到可执行状态时,它有可能重新执行。如通过对一个线程调用wait()函数后,线程就进入停滞状态,只有当两次对该线程调用notify或notifyAll后它才能两次回到可执行状态。

2. class Thread下的常用函数函数

2.1 suspend()、resume()

1) 通过suspend()函数,可使线程进入停滞状态。通过suspend()使线程进入停滞状态后,除非收到resume()消息,否则该线程不会变回可执行状态。

2) 当调用suspend()函数后,线程不会释放它的“锁标志”。

例11:

class TestThreadMethod extends Thread{

public static int shareVar = 0

public TestThreadMethod(String name){

super(name)

}

public synchronized void run(){

if(shareVar==0){

for(int i=0 i<5 i++){

shareVar++

if(shareVar==5){

this.suspend() //(1)

}}}

else{

System.out.print(Thread.currentThread().getName())

System.out.println(" shareVar = " + shareVar)

this.resume() //(2)

}}

}

public class TestThread{

public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

TestThreadMethod t2 = new TestThreadMethod("t2")

t1.start() //(5)

//t1.start() //(3)

t2.start() //(4)

}}

运行结果为:

t2 shareVar = 5

i. 当代码(5)的t1所产生的线程运行到代码(1)处时,该线程进入停滞状态。然后排程器从线程池中唤起代码(4)的t2所产生的线程,此时shareVar值不为0,所以执行else中的语句。

ii. 也许你会问,那执行代码(2)后为什么不会使t1进入可执行状态呢?正如前面所说,t1和t2是两个不同对象的线程,而代码(1)和(2)都只对当前对象进行操作,所以t1所产生的线程执行代码(1)的结果是对象t1的当前线程进入停滞状态;而t2所产生的线程执行代码(2)的结果是把对象t2中的所有处于停滞状态的线程调回到可执行状态。

iii. 那现在把代码(4)注释掉,并去掉代码(3)的注释,是不是就能使t1重新回到可执行状态呢?运行结果是什么也不输出。为什么会这样呢?也许你会认为,当代码(5)所产生的线程执行到代码(1)时,它进入停滞状态;而代码(3)所产生的线程和代码(5)所产生的线程是属于同一个对象的,那么就当代码(3)所产生的线程执行到代码(2)时,就可使代码(5)所产生的线程执行回到可执行状态。但是要清楚,suspend()函数只是让当前线程进入停滞状态,但并不释放当前线程所获得的“锁标志”。所以当代码(5)所产生的线程进入停滞状态时,代码(3)所产生的线程仍不能启动,因为当前对象的“锁标志”仍被代码(5)所产生的线程占有。

#p#2.2 sleep()

1) sleep ()函数有一个参数,通过参数可使线程在指定的时间内进入停滞状态,当指定的时间过后,线程则自动进入可执行状态。

2) 当调用sleep ()函数后,线程不会释放它的“锁标志”。

例12:

class TestThreadMethod extends Thread{

class TestThreadMethod extends Thread{

public static int shareVar = 0

public TestThreadMethod(String name){

super(name)

}

public synchronized void run(){

for(int i=0 i<3 i++){

System.out.print(Thread.currentThread().getName())

System.out.println(" : " + i)

try{

Thread.sleep(100) //(4)

}

catch(InterruptedException e){

System.out.println("Interrupted")

}}}

}

public class TestThread{public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

TestThreadMethod t2 = new TestThreadMethod("t2")

t1.start() (1)

t1.start() (2)

//t2.start() (3)

}}

运行结果为:

t1 : 0

t1 : 1

t1 : 2

t1 : 0

t1 : 1

t1 : 2

由结果可证明,虽然在run()中执行了sleep(),但是它不会释放对象的“锁标志”,所以除非代码(1)的线程执行完run()函数并释放对象的“锁标志”,否则代码(2)的线程永远不会执行。

如果把代码(2)注释掉,并去掉代码(3)的注释,结果将变为:

t1 : 0

t2 : 0

t1 : 1

t2 : 1

t1 : 2

t2 : 2

由于t1和t2是两个对象的线程,所以当线程t1通过sleep()进入停滞时,排程器会从线程池中调用其它的可执行线程,从而t2线程被启动。

例13:

class TestThreadMethod extends Thread{

public static int shareVar = 0

public TestThreadMethod(String name){

super(name)

}

public synchronized void run(){

for(int i=0 i<5 i++){

System.out.print(Thread.currentThread().getName())

System.out.println(" : " + i)

try{

if(Thread.currentThread().getName().equals("t1"))

Thread.sleep(200)

else

Thread.sleep(100)

}

catch(InterruptedException e){

System.out.println("Interrupted")

}}

}}

public class TestThread{public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

TestThreadMethod t2 = new TestThreadMethod("t2")

t1.start()

//t1.start()

t2.start()

}}

运行结果为:

t1 : 0

t2 : 0

t2 : 1

t1 : 1

t2 : 2

t2 : 3

t1 : 2

t2 : 4

t1 : 3

t1 : 4

由于线程t1调用了sleep(200),而线程t2调用了sleep(100),所以线程t2处于停滞状态的时间是线程t1的一半,从从结果反映出来的就是线程t2打印两倍次线程t1才打印一次。

#p#2.3 yield()

1) 通过yield ()函数,可使线程进入可执行状态,排程器从可执行状态的线程中重新进行排程。所以调用了yield()的函数也有可能马上被执行。

2) 当调用yield ()函数后,线程不会释放它的“锁标志”。

例14:

class TestThreadMethod extends Thread{

public static int shareVar = 0

public TestThreadMethod(String name){super(name)

}

public synchronized void run(){for(int i=0 i<4 i++){

System.out.print(Thread.currentThread().getName())

System.out.println(" : " + i)

Thread.yield()

}}

}

public class TestThread{public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

TestThreadMethod t2 = new TestThreadMethod("t2")

t1.start()

t1.start() //(1)

//t2.start() (2)

}

}

运行结果为:

t1 : 0

t1 : 1

t1 : 2

t1 : 3

t1 : 0

t1 : 1

t1 : 2

t1 : 3

从结果可知调用yield()时并不会释放对象的“锁标志”。

如果把代码(1)注释掉,并去掉代码(2)的注释,结果为:

t1 : 0

t1 : 1

t2 : 0

t1 : 2

t2 : 1

t1 : 3

t2 : 2

t2 : 3

从结果可知,虽然t1线程调用了yield(),但它马上又被执行了。

2.4 sleep()和yield()的区别

1) sleep()使当前线程进入停滞状态,所以执行sleep()的线程在指定的时间内肯定不会执行;yield()只是使当前线程重新回到可执行状态,所以执行yield()的线程有可能在进入到可执行状态后马上又被执行。

2) sleep()可使优先级低的线程得到执行的机会,当然也可以让同优先级和高优先级的线程有执行的机会;yield()只能使同优先级的线程有执行的机会。

例15:

class TestThreadMethod extends Thread{

public static int shareVar = 0

public TestThreadMethod(String name){

super(name)

}

public void run(){

for(int i=0 i<4 i++){

System.out.print(Thread.currentThread().getName())

System.out.println(" : " + i)

//Thread.yield() (1)

/* (2) */

try{

Thread.sleep(3000)

}

catch(InterruptedException e){

System.out.println("Interrupted")

}}}

}

public class TestThread{

public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

TestThreadMethod t2 = new TestThreadMethod("t2")

t1.setPriority(Thread.MAX_PRIORITY)

t2.setPriority(Thread.MIN_PRIORITY)

t1.start()

t2.start()

}

}

运行结果为:

t1 : 0

t1 : 1

t2 : 0

t1 : 2

t2 : 1

t1 : 3

t2 : 2

t2 : 3

由结果可见,通过sleep()可使优先级较低的线程有执行的机会。注释掉代码(2),并去掉代码(1)的注释,结果为:

t1 : 0

t1 : 1

t1 : 2

t1 : 3

t2 : 0

t2 : 1

t2 : 2

t2 : 3

可见,调用yield(),不同优先级的线程永远不会得到执行机会。

2.5 join()

使调用join()的线程执行完毕后才能执行其它线程,在一定意义上,它可以实现同步的功能。

例16:

class TestThreadMethod extends Thread{

public static int shareVar = 0

public TestThreadMethod(String name){

super(name)

}

public void run(){

for(int i=0 i<4 i++){

System.out.println(" " + i)

try{

Thread.sleep(3000)

}

catch(InterruptedException e){

System.out.println("Interrupted")

}

}

}

}

public class TestThread{

public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

t1.start()

try{

t1.join()

}

catch(InterruptedException e){}

t1.start()

}

}

运行结果为:

0

1

2

3

0

1

2

3

#p#3. class Object下常用的线程函数

wait()、notify()和notifyAll()这三个函数由java.lang.Object类提供,用于协调多个线程对共享数据的存取。

3.1 wait()、notify()和notifyAll()

1) wait()函数有两种形式:第一种形式接受一个毫秒值,用于在指定时间长度内暂停线程,使线程进入停滞状态。第二种形式为不带参数,代表waite()在notify()或notifyAll()之前会持续停滞。

2) 当对一个对象执行notify()时,会从线程等待池中移走该任意一个线程,并把它放到锁标志等待池中;当对一个对象执行notifyAll()时,会从线程等待池中移走所有该对象的所有线程,并把它们放到锁标志等待池中。

3) 当调用wait()后,线程会释放掉它所占有的“锁标志”,从而使线程所在对象中的其它synchronized数据可被别的线程使用。

例17:

下面,我们将对例11中的例子进行修改

class TestThreadMethod extends Thread{

public static int shareVar = 0

public TestThreadMethod(String name){

super(name)

}

public synchronized void run(){

if(shareVar==0){

for(int i=0 i<10 i++){

shareVar++

if(shareVar==5){

try{

this.wait() //(4)

}

catch(InterruptedException e){}

}

}

}

if(shareVar!=0){

System.out.print(Thread.currentThread().getName())

System.out.println(" shareVar = " + shareVar)

this.notify() //(5)

}

}

}

public class TestThread{

public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

TestThreadMethod t2 = new TestThreadMethod("t2")

t1.start() //(1)

//t1.start() (2)

t2.start() //(3)

}}

运行结果为:

t2 shareVar = 5

因为t1和t2是两个不同对象,所以线程t2调用代码(5)不能唤起线程t1。如果去掉代码(2)的注释,并注释掉代码(3),结果为:

t1 shareVar = 5

t1 shareVar = 10

这是因为,当代码(1)的线程执行到代码(4)时,它进入停滞状态,并释放对象的锁状态。接着,代码(2)的线程执行run(),由于此时shareVar值为5,所以执行打印语句并调用代码(5)使代码(1)的线程进入可执行状态,然后代码(2)的线程结束。当代码(1)的线程重新执行后,它接着执行for()循环一直到shareVar=10,然后打印shareVar。

#p#3.2 wait()、notify()和synchronized

waite()和notify()因为会对对象的“锁标志”进行操作,所以它们必须在synchronized函数或synchronized block中进行调用。如果在non-synchronized函数或non-synchronized block中进行调用,虽然能编译通过,但在运行时会发生IllegalMonitorStateException的异常。

例18:

class TestThreadMethod extends Thread{

public int shareVar = 0

public TestThreadMethod(String name){

super(name)

new Notifier(this)

}

public synchronized void run(){

if(shareVar==0){

for(int i=0 i<5 i++){

shareVar++

System.out.println("i = " + shareVar)

try{

System.out.println("wait......")

this.wait()

}

catch(InterruptedException e){}

}}

}

}

class Notifier extends Thread{

private TestThreadMethod ttm

Notifier(TestThreadMethod t){

ttm = t

start()

}

public void run(){

while(true){

try{

sleep(2000)

}

catch(InterruptedException e){}

/*1 要同步的不是当前对象的做法 */

synchronized(ttm){

System.out.println("notify......")

ttm.notify()

}}

}

}

public class TestThread{

public static void main(String[] args){

TestThreadMethod t1 = new TestThreadMethod("t1")

t1.start()

}

}

运行结果为:

i = 1

wait......

notify......

i = 2

wait......

notify......

i = 3

wait......

notify......

i = 4

wait......

notify......

i = 5

wait......

notify......

4. wait()、notify()、notifyAll()和suspend()、resume()、sleep()的讨论

4.1 这两组函数的区别

1) wait()使当前线程进入停滞状态时,还会释放当前线程所占有的“锁标志”,从而使线程对象中的synchronized资源可被对象中别的线程使用;而suspend()和sleep()使当前线程进入停滞状态时不会释放当前线程所占有的“锁标志”。

2) 前一组函数必须在synchronized函数或synchronized block中调用,否则在运行时会产生错误;而后一组函数可以non-synchronized函数和synchronized block中调用。

4.2 这两组函数的取舍

Java2已不建议使用后一组函数。因为在调用suspend()时不会释放当前线程所取得的“锁标志”,这样很容易造成“死锁”。

“消息队列”是在消息的传输过程中保存消息的容器。和我们学过的LinkedHashMap,TreeSet等一样,都是容器。既然是容器,就有有自己的特性,就像LinkedHashMap是以键值对存储。存取顺序不变。而消息队列,看到队列就可以知道。这个容器里面的消息是站好队的,一般遵从先进先出原则。

java中已经为我们封装好了很多的消息队列。在java 1.5版本时推出的java.util.concurrent中有很多现成的队列供我们使用。特性繁多,种类齐全。是你居家旅游开发必备QAQ。

下面简单列举这个包中的消息队列

:阻塞队列 BlockingQueue

数组阻塞队列 ArrayBlockingQueue

延迟队列 DelayQueue

链阻塞队列 LinkedBlockingQueue

具有优先级的阻塞队列 PriorityBlockingQueue

同步队列 SynchronousQueue

阻塞双端队列 BlockingDeque

链阻塞双端队列 LinkedBlockingDeque

不同的队列不同的特性决定了队列使用的时机,感兴趣的话你可以详细了解。具体的使用方式我就不赘述了

下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处。有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信——使用命名管道

一、什么是消息队列

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。

二、在Linux中使用消息队列

Linux提供了一系列消息队列的函数接口来让我们方便地使用它来实现进程间的通信。它的用法与其他两个System V PIC机制,即信号量和共享内存相似。

1、msgget函数

该函数用来创建和访问一个消息队列。它的原型为:

int msgget(key_t, key, int msgflg)

与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。

它返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1.

2、msgsnd函数

该函数用来把消息添加到消息队列中。它的原型为:

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg)

msgid是由msgget函数返回的消息队列标识符。

msg_ptr是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。所以消息结构要定义成这样:

struct my_message{

long int message_type

/* The data you wish to transfer*/

}

msg_sz是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。

msgflg用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。

如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.

3、msgrcv函数

该函数用来从一个消息队列获取消息,它的原型为

int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg)

msgid, msg_ptr, msg_st的作用也函数msgsnd函数的一样。

msgtype可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。

msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。

调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.

4、msgctl函数

该函数用来控制消息队列,它与共享内存的shmctl函数相似,它的原型为:

int msgctl(int msgid, int command, struct msgid_ds *buf)

command是将要采取的动作,它可以取3个值,

IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。

IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值

IPC_RMID:删除消息队列

buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds结构至少包括以下成员:

struct msgid_ds

{

uid_t shm_perm.uid

uid_t shm_perm.gid

mode_t shm_perm.mode

}

成功时返回0,失败时返回-1.

三、使用消息队列进行进程间通信

马不停蹄,介绍完消息队列的定义和可使用的接口之后,我们来看看它是怎么让进程进行通信的。由于可以让不相关的进程进行行通信,所以我们在这里将会编写两个程序,msgreceive和msgsned来表示接收和发送信息。根据正常的情况,我们允许两个程序都可以创建消息,但只有接收者在接收完最后一个消息之后,它才把它删除。

接收信息的程序源文件为msgreceive.c的源代码为:

#include <unistd.h>

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <errno.h>

#include <sys/msg.h>

struct msg_st

{

long int msg_type

char text[BUFSIZ]

}

int main()

{

int running = 1

int msgid = -1

struct msg_st data

long int msgtype = 0//注意1

//建立消息队列

msgid = msgget((key_t)1234, 0666 | IPC_CREAT)

if(msgid == -1)

{

fprintf(stderr, "msgget failed with error: %d\n", errno)

exit(EXIT_FAILURE)

}

//从队列中获取消息,直到遇到end消息为止

while(running)

{

if(msgrcv(msgid, (void*)&data, BUFSIZ, msgtype, 0) == -1)

{

fprintf(stderr, "msgrcv failed with errno: %d\n", errno)

exit(EXIT_FAILURE)

}

printf("You wrote: %s\n",data.text)

//遇到end结束

if(strncmp(data.text, "end", 3) == 0)

running = 0

}

//删除消息队列

if(msgctl(msgid, IPC_RMID, 0) == -1)

{

fprintf(stderr, "msgctl(IPC_RMID) failed\n")

exit(EXIT_FAILURE)

}

exit(EXIT_SUCCESS)

}

发送信息的程序的源文件msgsend.c的源代码为:

#include <unistd.h>

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <sys/msg.h>

#include <errno.h>

#define MAX_TEXT 512

struct msg_st

{

long int msg_type

char text[MAX_TEXT]

}

int main()

{

int running = 1

struct msg_st data

char buffer[BUFSIZ]

int msgid = -1

//建立消息队列

msgid = msgget((key_t)1234, 0666 | IPC_CREAT)

if(msgid == -1)

{

fprintf(stderr, "msgget failed with error: %d\n", errno)

exit(EXIT_FAILURE)

}

//向消息队列中写消息,直到写入end

while(running)

{

//输入数据

printf("Enter some text: ")

fgets(buffer, BUFSIZ, stdin)

data.msg_type = 1 //注意2

strcpy(data.text, buffer)

//向队列发送数据

if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)

{

fprintf(stderr, "msgsnd failed\n")

exit(EXIT_FAILURE)

}

//输入end结束输入

if(strncmp(buffer, "end", 3) == 0)

running = 0

sleep(1)

}

exit(EXIT_SUCCESS)

}

转载仅供参考,版权属于原作者。祝你愉快,满意请采纳哦