java多线程共同操作同一个队列,怎么实现?

Python012

java多线程共同操作同一个队列,怎么实现?,第1张

具体代码如下:

以下是两个线程

import java.util.*

public class Thread_List_Operation {

//假设有这么一个队列

static List list = new LinkedList()

public static void main(String[] args) {

Thread t

t = new Thread(new T1())

t.start()

t = new Thread(new T2())

t.start()

}

}

//线程T1,用来给list添加新元素

class T1 implements Runnable{

void getElemt(Object o){

Thread_List_Operation.list.add(o)

System.out.println(Thread.currentThread().getName() + "为队列添加了一个元素")

}

@Override

public void run() {

for (int i = 0i <10i++) {

getElemt(new Integer(1))

}

}

}

//线程T2,用来给list添加新元素

class T2 implements Runnable{

void getElemt(Object o){

Thread_List_Operation.list.add(o)

System.out.println(Thread.currentThread().getName() + "为队列添加了一个元素")

}

@Override

public void run() {

for (int i = 0i <10i++) {

getElemt(new Integer(1))

}

}

}

//结果(乱序)

Thread-0为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-1为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-0为队列添加了一个元素

Thread-0为队列添加了一个元素

使用线程池的好处

1、降低资源消耗

可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

2、提高响应速度

任务到达时,任务可以不需要等到线程创建就能立即执行。

3、提高线程的可管理性

线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

线程池的工作原理

首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

线程池饱和策略

这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

AbortPolicy

为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。

DiscardPolicy

直接抛弃,任务不执行,空方法

DiscardOldestPolicy

从队列里面抛弃head的一个任务,并再次execute 此task。

CallerRunsPolicy

在调用execute的线程里面执行此command,会阻塞入口

用户自定义拒绝策略(最常用)

实现RejectedExecutionHandler,并自己定义策略模式

下我们以ThreadPoolExecutor为例展示下线程池的工作流程图

1.jpg

2.jpg

1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇

3.jpg

关键方法源码分析

我们看看核心方法添加到线程池方法execute的源码如下:

//     //Executes the given task sometime in the future.  The task     //may execute in a new thread or in an existing pooled thread.     //     // If the task cannot be submitted for execution, either because this     // executor has been shutdown or because its capacity has been reached,     // the task is handled by the current {@code RejectedExecutionHandler}.     //     // @param command the task to execute     // @throws RejectedExecutionException at discretion of     //         {@code RejectedExecutionHandler}, if the task     //         cannot be accepted for execution     // @throws NullPointerException if {@code command} is null     //    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException()       //         // Proceed in 3 steps:         //         // 1. If fewer than corePoolSize threads are running, try to         // start a new thread with the given command as its first         // task.  The call to addWorker atomically checks runState and         // workerCount, and so prevents false alarms that would add         // threads when it shouldn't, by returning false.         // 翻译如下:         // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,         // 如果能完成新线程创建exexute方法结束,成功提交任务         // 2. If a task can be successfully queued, then we still need         // to double-check whether we should have added a thread         // (because existing ones died since last checking) or that         // the pool shut down since entry into this method. So we         // recheck state and if necessary roll back the enqueuing if         // stopped, or start a new thread if there are none.         // 翻译如下:         // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要         // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;         // 3. If we cannot queue task, then we try to add a new         // thread.  If it fails, we know we are shut down or saturated         // and so reject the task.         // 翻译如下:         // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池         // 已经达到饱和状态,所以reject这个他任务         //        int c = ctl.get()       // 工作线程数小于核心线程数        if (workerCountOf(c) <corePoolSize) {            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize            if (addWorker(command, true))                return           c = ctl.get()       }        // 如果工作线程数大于等于核心线程数        // 线程的的状态未RUNNING并且队列notfull        if (isRunning(c) &&workQueue.offer(command)) {            // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除            int recheck = ctl.get()           if (! isRunning(recheck) &&remove(command))                // 移除成功,拒绝该非运行的任务                reject(command)           else if (workerCountOf(recheck) == 0)                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务                addWorker(null, false)       }        // 如果队列满了或者是非运行的任务都拒绝执行        else if (!addWorker(command, false))            reject(command)   }

你好,我可以给你详细解释一下:

线程组表示一个线程的集合。此外,线程组也可以包含其他线程组。线程组构成一棵树,在树中,除了初始线程组外,每个线程组都有一个父线程组。

允许线程访问有关自己的线程组的信息,但是不允许它访问有关其线程组的父线程组或其他任何线程组的信息。

线程池:我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。

线程池经常应用在多线程服务器上。每个通过网络到达服务器的连接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。以后会再深入有关 Java 实现多线程服务器的细节。

线程队列:是指线程处于拥塞的时候形成的调度队列

排队有三种通用策略:

直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。