线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码
//ThreadTask java
package polarman threadpool
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run()
}
//PooledThread java
package polarman threadpool
import java util Collectionimport java util Vector
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector()
protected boolean running = false
protected boolean stopped = false
protected boolean paused = false
protected boolean killed = false
private ThreadPool pool
public PooledThread(ThreadPool pool) { this pool = pool
}
public void putTask(ThreadTask task) { tasks add(task)
}
public void putTasks(ThreadTask[] tasks) { for(int i= i<tasks lengthi++) this tasks add(tasks[i])
}
public void putTasks(Collection tasks) { this tasks addAll(tasks)
}
protected ThreadTask popTask() { if(tasks size() >) return (ThreadTask)tasks remove( )
else
return null
}
public boolean isRunning() {
return running
}
public void stopTasks() {
stopped = true
}
public void stopTasksSync() {
stopTasks()
while(isRunning()) { try {
sleep( )
} catch (InterruptedException e) {
}
}
}
public void pauseTasks() {
paused = true
}
public void pauseTasksSync() {
pauseTasks()
while(isRunning()) { try {
sleep( )
} catch (InterruptedException e) {
}
}
}
public void kill() { if(!running)
interrupt()
else
killed = true
}
public void killSync() {
kill()
while(isAlive()) { try {
sleep( )
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks() {
running = true
this notify()
}
public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread()//System out println(Thread currentThread() getId() + : 空闲 )this wait()}else {
ThreadTask task
while((task = popTask()) != null) { task run()if(stopped) {
stopped = false
if(tasks size() >) { tasks clear()System out println(Thread currentThread() getId() + : Tasks are stopped )
break
}
}
if(paused) {
paused = false
if(tasks size() >) { System out println(Thread currentThread() getId() + : Tasks are paused )
break
}
}
}
running = false
}
if(killed) {
killed = false
break
}
}
}catch(InterruptedException e) {
return
}
//System out println(Thread currentThread() getId() + : Killed )
}
}
//ThreadPool java
package polarman threadpool
import java util Collectionimport java util Iteratorimport java util Vector
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize
protected int initPoolSize
protected Vector threads = new Vector()
protected boolean initialized = false
protected boolean hasIdleThread = false
public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSizethis initPoolSize = initPoolSize
}
public void init() {
initialized = true
for(int i= i<initPoolSizei++) {
PooledThread thread = new PooledThread(this)
thread start()threads add(thread)
}
//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize)
}
public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize)this maxPoolSize = maxPoolSize
if(maxPoolSize <getPoolSize())
setPoolSize(maxPoolSize)
}
/** *//**
*重设当前线程数
* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) { if(!initialized) {
initPoolSize = size
return
}else if(size >getPoolSize()) { for(int i=getPoolSize()i<size &&i<maxPoolSizei++) {
PooledThread thread = new PooledThread(this)
thread start()threads add(thread)
}
}else if(size <getPoolSize()) { while(getPoolSize() >size) { PooledThread th = (PooledThread)threads remove( )th kill()
}
}
//System out println( 重设线程数 线程数= + threads size())
}
public int getPoolSize() { return threads size()
}
protected void notifyForIdleThread() {
hasIdleThread = true
}
protected boolean waitForIdleThread() {
hasIdleThread = false
while(!hasIdleThread &&getPoolSize() >= maxPoolSize) { try { Thread sleep( )} catch (InterruptedException e) {
return false
}
}
return true
}
public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator()itr hasNext()) { PooledThread th = (PooledThread)itr next()if(!th isRunning())
return th
}
if(getPoolSize() <maxPoolSize) {
PooledThread thread = new PooledThread(this)
thread start()threads add(thread)
return thread
}
//System out println( 线程池已满 等待 )
if(waitForIdleThread() == false)
return null
}
}
public void processTask(ThreadTask task) {
PooledThread th = getIdleThread()
if(th != null) { th putTask(task)th startTasks()
}
}
public void processTasksInSingleThread(ThreadTask[] tasks) {
PooledThread th = getIdleThread()
if(th != null) { th putTasks(tasks)th startTasks()
}
}
public void processTasksInSingleThread(Collection tasks) {
PooledThread th = getIdleThread()
if(th != null) { th putTasks(tasks)th startTasks()
}
}
}
下面是线程池的测试程序
//ThreadPoolTest java
import java io BufferedReaderimport java io IOExceptionimport java io InputStreamReader
import polarman threadpool ThreadPoolimport polarman threadpool ThreadTask
public class ThreadPoolTest {
public static void main(String[] args) { System out println( quit 退出 )System out println( task A 启动任务A 时长为 秒 )System out println( size 设置当前线程池大小为 )System out println( max 设置线程池最大线程数为 )System out println()
final ThreadPool pool = new ThreadPool( )pool init()
Thread cmdThread = new Thread() { public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System in))
while(true) { try { String line = reader readLine()String words[] = line split( )if(words[ ] equalsIgnoreCase( quit )) { System exit( )}else if(words[ ] equalsIgnoreCase( size ) &&words length >= ) { try { int size = Integer parseInt(words[ ])pool setPoolSize(size)}catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( max ) &&words length >= ) { try { int max = Integer parseInt(words[ ])pool setMaxPoolSize(max)}catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( task ) &&words length >= ) { try { int timelen = Integer parseInt(words[ ])SimpleTask task = new SimpleTask(words[ ] timelen * )pool processTask(task)}catch(Exception e) {
}
}
} catch (IOException e) { e printStackTrace()
}
}
}
}
cmdThread start()
/**//*
for(int i= i<i++){
SimpleTask task = new SimpleTask( Task + i (i+ )* )pool processTask(task)
}*/
}
}
class SimpleTask implements ThreadTask {
private String taskName
private int timeLen
public SimpleTask(String taskName int timeLen) { this taskName = taskNamethis timeLen = timeLen
}
public void run() { System out println(Thread currentThread() getId() +
: START TASK + taskName + )
try { Thread sleep(timeLen)} catch (InterruptedException e) {
}
System out println(Thread currentThread() getId() +
: END TASK + taskName + )
}
}
使用此线程池相当简单 下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( )pool init()
要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()
两行代码即可调用
lishixinzhi/Article/program/Java/hx/201311/27203首先说明下java线程是如何实现线程重用的
1. 线程执行完一个Runnable的run()方法后,不会被杀死
2. 当线程被重用时,这个线程会进入新Runnable对象的run()方法12
java线程池由Executors提供的几种静态方法创建线程池。下面通过代码片段简单介绍下线程池的几种实现方式。后续会针对每个实现方式做详细的说明
newFixedThreadPool
创建一个固定大小的线程池
添加的任务达到线程池的容量之后开始加入任务队列开始线程重用总共开启线程个数跟指定容量相同。
@Test
public void newFixedThreadPool() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1)
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().build())
RunThread run1 = new RunThread("run 1")
executorService.execute(run1)
executorService.shutdown()
}12345678
newSingleThreadExecutor
仅支持单线程顺序处理任务
@Test
public void newSingleThreadExecutor() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor()
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build())
executorService.execute(new RunThread("run 1"))
executorService.execute(new RunThread("run 2"))
executorService.shutdown()
}123456789
newCachedThreadPool
这种情况跟第一种的方式类似,不同的是这种情况线程池容量上线是Integer.MAX_VALUE 并且线程池开启缓存60s
@Test
public void newCachedThreadPool() throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool()
executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().build())
executorService.execute(new RunThread("run 1"))
executorService.execute(new RunThread("run 2"))
executorService.shutdown()
}123456789
newWorkStealingPool
支持给定的并行级别,并且可以使用多个队列来减少争用。
@Test
public void newWorkStealingPool() throws Exception {
ExecutorService executorService = Executors.newWorkStealingPool()
executorService = Executors.newWorkStealingPool(1)
RunThread run1 = new RunThread("run 1")
executorService.execute(run1)
executorService.shutdown()
}123456789
newScheduledThreadPool
看到的现象和第一种相同,也是在线程池满之前是新建线程,然后开始进入任务队列,进行线程重用
支持定时周期执行任务(还没有看完)
@Test
public void newScheduledThreadPool() throws Exception {
ExecutorService executorService = Executors.newScheduledThreadPool(1)
executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build())
executorService.execute(new RunThread("run 1"))
executorService.execute(new RunThread("run 2"))
executorService.shutdown()
}
很多场景下应用程序必须能够处理一系列传入请求,简单的处理方式是通过一个线程顺序的处理这些请求,如下图:
单线程策略的优势和劣势都非常明显:
优势:设计和实现简单;劣势:这种方式会带来处理效率的问题,单线程的处理能力是有限,不能发挥多核处理器优势。
在这种场景下我们就需要考虑并发,一个简单的并发策略就是Thread-Per-Message模式,即为每个请求使用一个新的线程。
Thread-Per-Message策略的优势和劣势也非常明显:
优势:设计和实现比较简单,能够同时处理多个请求,提升响应效率;
劣势:主要在两个方面
1.资源消耗 引入了在串行执行中所没有的开销,包括线程创建和调度,任务处理,资源分配和回收以及频繁上下文切换所需的时间和资源。2.安全
有没有一种方式可以并发执行又可以克服Thread-Per-Message的问题?
采用线程池的策略,线程池通过控制并发执行的工作线程的最大数量来解决Thread-Per-Message带来的问题。可见下图,请求来临时先放入线程池的队列
线程池可以接受一个Runnable或Callable<T>任务,并将其存储在临时队列中,当有空闲线程时可以从队列中拿到一个任务并执行。
反例(使用 Thread-Per-Message 策略)
正例(使用 线程池 策略)
JAVA 中(JDK 1.5+)线程池的种类:
程序不能使用来自有界线程池的线程来执行依赖于线程池中其他任务的任务。
有两个场景:
要缓解上面两个场景产生的问题有两个简单的办法:
真正解决此类方法还是需要梳理线程池执行业务流程,不要在有界线程池中执行相互依赖的任务,防止出现竞争和死锁。
向线程池提交的任务需要支持中断。从而保证线程可以中断,线程池可以关闭。线程池支持 java.util.concurrent.ExecutorService.shutdownNow() 方法,该方法尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务的列表。
但是 shutdownNow() 除了尽力尝试停止处理主动执行的任务之外不能保证一定能够停止。例如,典型的实现是通过Thread.interrupt()来停止,因此任何未能响应中断的任务可能永远不会终止,也就造成线程池无法真正的关闭。
反例:
正例:
线程池中的所有任务必须提供机制,如果它们异常终止,则需要通知应用程序.
如果不这样做不会导致资源泄漏,但由于池中的线程仍然被会重复使用,使故障诊断非常困难或不可能。
在应用程序级别处理异常的最好方法是使用异常处理。异常处理可以执行诊断操作,清理和关闭Java虚拟机,或者只是记录故障的详细信息。
也就是说在线程池里执行的任务也需要能够抛出异常并被捕获处理。
任务恢复或清除操作可以通过重写 java.util.concurrent.ThreadPoolExecutor 类的 afterExecute() 钩子来执行。
当任务通过执行其 run() 方法中的所有语句并且成功结束任务,或者由于异常而导致任务停止时,将调用此钩子。
可以通过自定义 ThreadPoolExecutor 服务来重载 afterExecute()钩子。
还可以通过重载 terminated() 方法来释放线程池获取的资源,就像一个finally块。
反例:
任务意外终止时作为一个运行时异常,无法通知应用程序。此外,它缺乏恢复机制。因此,如果Task抛出一个NullPointerException ,异常将被忽略。
正例:
另外一种方式是使用 ExecutorService.submit() 方法(代替 execute() 方法)将任务提交到线程池并获取 Future 对象。
当通过 ExecutorService.submit() 提交任务时,抛出的异常并未到达未捕获的异常处理机制,因为抛出的异常被认为是返回状态的一部分,因此被包装在ExecutionException ,并由Future.get() 返回。
java.lang.ThreadLocal 类提供线程内的本地变量。根据Java API
ThreadLocal对象需要关注那些对象被线程池中的多个线程执行的类。
线程池缓存技术允许线程重用以减少线程创建开销,或者当创建无限数量的线程时可以降低系统的可靠性。
当 ThreadLocal 对象在一个线程中被修改,随后变得可重用时,在重用的线程上执行的下一个任务将能看到该线程上执行过的上一个任务修改的ThreadLocal 对象的状态。
所以要在使用线程池时重新初始化的ThreadLocal对象实例。
反例:
DiaryPool类创建了一个线程池,它可以通过一个共享的无界的队列来重用固定数量的线程。
在任何时候,不超过numOfThreads个线程正在处理任务。如果在所有线程都处于活动状态时提交其他任务,则 它们在队列中等待,直到线程可用。
当线程循环时,线程的线程局部状态仍然存在。
下表显示了可能的执行顺序:
时间任务线程池提交方法日期1t11doSomething1()星期五2t22doSomething2()星期一3t31doSomething3()星期五
在这个执行顺序中,期望从doSomething2() 开始的两个任务( t 2和t 3 doSomething2() 将当天视为星 期一。然而,因为池线程1被重用,所以t 3观察到星期五。
解决方案(try-finally条款)
符合规则的方案removeDay() 方法添加到Diary类,并在try‐finally 块中的实现doSomething1() 类的doSomething1() 方法的语句。finally 块通过删除当前线程中的值来恢复threadlocal类型的days对象的初始状态。
如果threadlocal变量再次被同一个线程读取,它将使用initialValue()方法重新初始化 ,除非任务已经明确设置了变量的值。这个解决方案将维护的责任转移到客户端( DiaryPool ),但是当Diary类不能被修改时是一个好的选择。
解决方案(beforeExecute())
使用一个自定义ThreadPoolExecutor 来扩展 ThreadPoolExecutor并覆盖beforeExecute() 方法。beforeExecute() 方法在Runnable 任务在指定线程中执行之前被调用。该方法在线程 “t” 执行任务 “r” 之前重新初始化 threadlocal 变量。