超详细的线程池使用解析

Python015

超详细的线程池使用解析,第1张

Java 中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理的使用线程池可以带来多个好处:

(1) 降低资源消耗 。通过重复利用已创建的线程降低线程在创建和销毁时造成的消耗。

(2) 提高响应速度 。当处理执行任务时,任务可以不需要等待线程的创建就能立刻执行。

(3) 提高线程的可管理性 。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的处理流程如上图所示

线程池中通过 ctl 字段来表示线程池中的当前状态,主池控制状态 ctl 是 AtomicInteger 类型,包装了两个概念字段:workerCount 和 runState,workerCount 表示有效线程数,runState 表示是否正在运行、正在关闭等状态。使用 ctl 字段表示两个概念,ctl 的前 3 位表示线程池状态,线程池中限制 workerCount 为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。workerCount 是允许启动和不允许停止的工作程序的数量。该值可能与实际的活动线程数暂时不同,例如,当 ThreadFactory 在被询问时未能创建线程时,以及退出线程在终止前仍在执行记时。用户可见的池大小报告为工作集的当前大小。 runState 提供主要的生命周期控制,取值如下表所示:

runState 随着时间的推移而改变,在 awaitTermination() 方法中等待的线程将在状态达到 TERMINATED 时返回。状态的转换为:

RUNNING ->SHUTDOWN 在调用 shutdown() 时,可能隐含在 finalize() 中

(RUNNING 或 SHUTDOWN)->STOP 在调用 shutdownNow() 时

SHUTDOWN ->TIDYING 当队列和线程池都为空时

STOP ->TIDYING 当线程池为空时

TIDYING ->TERMINATED 当 terminate() 方法完成时

开发人员如果需要在线程池变为 TIDYING 状态时进行相应的处理,可以通过重载 terminated() 函数来实现。

结合上图说明线程池 ThreadPoolExecutor 执行流程,使用 execute() 方法提交任务到线程池中执行时分为4种场景:

(1)线程池中运行的线程数量小于 corePoolSize,创建新线程来执行任务。

(2)线程池中运行线程数量不小于 corePoolSize,将任务加入到阻塞队列 BlockingQueue。

(3)如果无法将任务加入到阻塞队列(队列已满),创建新的线程来处理任务(这里需要获取全局锁)。

(4)当创建新的线程数量使线程池中当前运行线程数量超过 maximumPoolSize,线程池中拒绝任务,调用 RejectedExecutionHandler.rejectedExecution() 方法处理。

源码分析:

线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。

创建线程池之前,首先要知道创建线程池中的核心参数:

corePoolSize (核心线程数大小):当提交任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。

runnableTaskQueue (任务队列):用于保存等待执行任务的阻塞队列。一般选择以下几种:

ArrayBlockingQueue:基于数组的有界阻塞队列,按照 FIFO 原则对元素进行排序。

LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。

SynchronousQueue:同步阻塞队列,也是不存储元素的阻塞队列。每一个插入操作必须要等到另一个 线程调用移除操作,否则插入操作一直处于阻塞状态。

PriorityBlockingQueue:优先阻塞队列,一个具有优先级的无限阻塞队列。

maximumPoolSize (最大线程数大小):线程池允许创建的最大线程数,当队列已满,并且线程池中的线程数小于最大线程数,则线程池会创建新的线程执行任务。当使用无界队列时,此参数无用。

RejectedExecutionHandler (拒绝策略):当任务队列和线程池都满了,说明线程池处于饱和状态,那么必须使用拒绝策略来处理新提交的任务。JDK 内置拒绝策略有以下 4 种:

AbortPolicy:直接抛出异常

CallerRunsPolicy:使用调用者所在的线程来执行任务

DiscardOldestPolicy:丢弃队列中最近的一个任务来执行当前任务

DiscardPolicy:直接丢弃不处理

可以根据应用场景来实现 RejectedExecutionHandler 接口自定义处理策略。

keepAliveTime (线程存活时间):线程池的工作线程空闲后,保持存活的时间。

TimeUnit (存活时间单位):可选单位DAYS(天)、HOURS(小时)、MINUTES(分钟)、MILLISECONDS(毫秒)、MICROSECONDS(微妙)、NANOSECONDS(纳秒)。

ThreadFactory (线程工厂):可以通过线程工厂给创建出来的线程设置有意义的名字。

创建线程池主要分为两大类,第一种是通过 Executors 工厂类创建线程池,第二种是自定义创建线程池。根据《阿里java开发手册》中的规范,线程池不允许使用 Executors 去创建,原因是规避资源耗尽的风险。

创建一个单线程化的线程池

创建固定线程数的线程池

以上两种创建线程池方式使用链表阻塞队列来存放任务,实际场景中可能会堆积大量请求导致 OOM

创建可缓存线程池

允许创建的线程数量最大为 Integer.MAX_VALUE,当创建大量线程时会导致 CPU 处于重负载状态和 OOM 的发生

向线程池提交任务可以使用两个方法,分别为 execute() 和 submit()。

execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute() 方法中传入的是 Runnable 类的实例。

submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值。get() 方法会阻塞当前线程直到任务完成,使用 get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完。

可以通过调用线程池的 shutdown() 或shutdownNow() 方法来关闭线程池。他们的原理是遍历线程池中的工作线程,然后逐个调用 interrupt() 方法来中断线程,所以无法响应中断任务可能永远无法终止。

shutdown() 和 shutdownNow() 方法的区别在于 shutdownNow 方法首先将线程池的状态设置为 STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

线程池使用面临的核心的问题在于: 线程池的参数并不好配置 。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。

(1)以任务型为参考的简单评估:

假设线程池大小的设置(N 为 CPU 的个数)

如果纯计算的任务,多线程并不能带来性能提升,因为 CPU 处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为 CPU 数量或+1;----为什么+1?因为可以防止 N 个线程中有一个线程意外中断或者退出,CPU 不会空闲等待。

如果是 IO 密集型应用, 则线程池大小设置为 2N+1. 线程数 = CPU 核数 目标 CPU 利用率 (1 + 平均等待时间 / 平均工作时间)

(2)以任务数为参考的理想状态评估:

1)默认值

2)如何设置 * 需要根据相关值来决定 - tasks :每秒的任务数,假设为500~1000 - taskCost:每个任务花费时间,假设为0.1s - responsetime:系统允许容忍的最大响应时间,假设为1s

以上都为理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器 cpu load 已经满了,则需要通过升级硬件和优化代码,降低 taskCost 来处理。

(仅为简单的理想状态的评估,可作为线程池参数设置的一个参考)

与主业务无直接数据依赖的从业务可以使用异步线程池来处理,在项目初始化时创建线程池并交给将从业务中的任务提交给异步线程池执行能够缩短响应时间。

严禁在业务代码中起线程!!!

当任务需要按照指定顺序(FIFO, LIFO, 优先级)执行时,推荐创建使用单线程化的线程池。

本文章主要说明了线程池的执行原理和创建方式以及推荐线程池参数设置和一般使用场景。在开发中,开发人员需要根据业务来合理的创建和使用线程池达到降低资源消耗,提高响应速度的目的。

原文链接:https://juejin.cn/post/7067324722811240479

线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了

本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码

//ThreadTask java

package polarman threadpool

/** *//**

*线程任务

* @author ryang

*

*/

public interface ThreadTask {

public void run()

}

//PooledThread java

package polarman threadpool

import java util Collectionimport java util Vector

/** *//**

*接受线程池管理的线程

* @author ryang

*

*/

public class PooledThread extends Thread {

protected Vector tasks = new Vector()

protected boolean running = false

protected boolean stopped = false

protected boolean paused = false

protected boolean killed = false

private ThreadPool pool

public PooledThread(ThreadPool pool) { this pool = pool

}

public void putTask(ThreadTask task) { tasks add(task)

}

public void putTasks(ThreadTask[] tasks) { for(int i= i<tasks lengthi++) this tasks add(tasks[i])

}

public void putTasks(Collection tasks) { this tasks addAll(tasks)

}

protected ThreadTask popTask() { if(tasks size() >) return (ThreadTask)tasks remove( )

else

return null

}

public boolean isRunning() {

return running

}

public void stopTasks() {

stopped = true

}

public void stopTasksSync() {

stopTasks()

while(isRunning()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public void pauseTasks() {

paused = true

}

public void pauseTasksSync() {

pauseTasks()

while(isRunning()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public void kill() { if(!running)

interrupt()

else

killed = true

}

public void killSync() {

kill()

while(isAlive()) { try {

sleep( )

} catch (InterruptedException e) {

}

}

}

public synchronized void startTasks() {

running = true

this notify()

}

public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread()//System out println(Thread currentThread() getId() + : 空闲 )this wait()}else {

ThreadTask task

while((task = popTask()) != null) { task run()if(stopped) {

stopped = false

if(tasks size() >) { tasks clear()System out println(Thread currentThread() getId() + : Tasks are stopped )

break

}

}

if(paused) {

paused = false

if(tasks size() >) { System out println(Thread currentThread() getId() + : Tasks are paused )

break

}

}

}

running = false

}

if(killed) {

killed = false

break

}

}

}catch(InterruptedException e) {

return

}

//System out println(Thread currentThread() getId() + : Killed )

}

}

//ThreadPool java

package polarman threadpool

import java util Collectionimport java util Iteratorimport java util Vector

/** *//**

*线程池

* @author ryang

*

*/

public class ThreadPool {

protected int maxPoolSize

protected int initPoolSize

protected Vector threads = new Vector()

protected boolean initialized = false

protected boolean hasIdleThread = false

public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSizethis initPoolSize = initPoolSize

}

public void init() {

initialized = true

for(int i= i<initPoolSizei++) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

}

//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize)

}

public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize)this maxPoolSize = maxPoolSize

if(maxPoolSize <getPoolSize())

setPoolSize(maxPoolSize)

}

/** *//**

*重设当前线程数

* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束

* @param size

*/

public void setPoolSize(int size) { if(!initialized) {

initPoolSize = size

return

}else if(size >getPoolSize()) { for(int i=getPoolSize()i<size &&i<maxPoolSizei++) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

}

}else if(size <getPoolSize()) { while(getPoolSize() >size) { PooledThread th = (PooledThread)threads remove( )th kill()

}

}

//System out println( 重设线程数 线程数= + threads size())

}

public int getPoolSize() { return threads size()

}

protected void notifyForIdleThread() {

hasIdleThread = true

}

protected boolean waitForIdleThread() {

hasIdleThread = false

while(!hasIdleThread &&getPoolSize() >= maxPoolSize) { try { Thread sleep( )} catch (InterruptedException e) {

return false

}

}

return true

}

public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator()itr hasNext()) { PooledThread th = (PooledThread)itr next()if(!th isRunning())

return th

}

if(getPoolSize() <maxPoolSize) {

PooledThread thread = new PooledThread(this)

thread start()threads add(thread)

return thread

}

//System out println( 线程池已满 等待 )

if(waitForIdleThread() == false)

return null

}

}

public void processTask(ThreadTask task) {

PooledThread th = getIdleThread()

if(th != null) { th putTask(task)th startTasks()

}

}

public void processTasksInSingleThread(ThreadTask[] tasks) {

PooledThread th = getIdleThread()

if(th != null) { th putTasks(tasks)th startTasks()

}

}

public void processTasksInSingleThread(Collection tasks) {

PooledThread th = getIdleThread()

if(th != null) { th putTasks(tasks)th startTasks()

}

}

}

下面是线程池的测试程序

//ThreadPoolTest java

import java io BufferedReaderimport java io IOExceptionimport java io InputStreamReader

import polarman threadpool ThreadPoolimport polarman threadpool ThreadTask

public class ThreadPoolTest {

public static void main(String[] args) { System out println( quit 退出 )System out println( task A 启动任务A 时长为 秒 )System out println( size 设置当前线程池大小为 )System out println( max 设置线程池最大线程数为 )System out println()

final ThreadPool pool = new ThreadPool( )pool init()

Thread cmdThread = new Thread() { public void run() {

BufferedReader reader = new BufferedReader(new InputStreamReader(System in))

while(true) { try { String line = reader readLine()String words[] = line split( )if(words[ ] equalsIgnoreCase( quit )) { System exit( )}else if(words[ ] equalsIgnoreCase( size ) &&words length >= ) { try { int size = Integer parseInt(words[ ])pool setPoolSize(size)}catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( max ) &&words length >= ) { try { int max = Integer parseInt(words[ ])pool setMaxPoolSize(max)}catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( task ) &&words length >= ) { try { int timelen = Integer parseInt(words[ ])SimpleTask task = new SimpleTask(words[ ] timelen * )pool processTask(task)}catch(Exception e) {

}

}

} catch (IOException e) { e printStackTrace()

}

}

}

}

cmdThread start()

/**//*

for(int i= i<i++){

SimpleTask task = new SimpleTask( Task + i (i+ )* )pool processTask(task)

}*/

}

}

class SimpleTask implements ThreadTask {

private String taskName

private int timeLen

public SimpleTask(String taskName int timeLen) { this taskName = taskNamethis timeLen = timeLen

}

public void run() { System out println(Thread currentThread() getId() +

: START TASK + taskName + )

try { Thread sleep(timeLen)} catch (InterruptedException e) {

}

System out println(Thread currentThread() getId() +

: END TASK + taskName + )

}

}

使用此线程池相当简单 下面两行代码初始化线程池

ThreadPool pool = new ThreadPool( )pool init()

要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()

两行代码即可调用

lishixinzhi/Article/program/Java/hx/201311/27203